Move persistence to server-side with Effect and SQLite#88
Move persistence to server-side with Effect and SQLite#88juliusmarminge wants to merge 1 commit intomainfrom
Conversation
- Add @t3tools/core package with Effect-based domain model - Add @t3tools/infra-sqlite package for SQLite persistence - Add CoreRuntime to server integrating the new packages - Remove client-side persistence (persistenceSchema.ts, store.test.ts) - Simplify web store by delegating persistence to server - Update contracts with new IPC and WebSocket message types Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
WalkthroughIntroduces comprehensive event-sourcing architecture with a new Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
Move application state persistence to the server by routing WebSocket requests through
|
Greptile SummaryThis PR implements a major architectural shift from client-side to server-side persistence using event sourcing and CQRS patterns. Key Changes:
Architecture: Benefits:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Client as Web Client
participant WS as WebSocket Server
participant Runtime as CoreRuntime
participant Engine as OrchestrationEngine
participant Store as SQLite EventStore
participant Projector as QueueProjector
participant Fanout as EffectFanout
Client->>WS: state.createThread(params)
WS->>Runtime: dispatch(ThreadCreateCommand)
Runtime->>Engine: execute(command)
Engine->>Engine: decide(command, currentState)
Engine->>Store: append(events)
Store-->>Engine: DomainEventEnvelope[]
Engine->>Projector: enqueue(events)
Projector->>Projector: drain() - reduce events
Projector->>Fanout: publish(StateUpdateNotification)
Fanout-->>WS: AsyncIterable<StateUpdate>
WS->>Client: push(state.updated, newState)
Client->>Client: SET_SERVER_STATE action
Last reviewed commit: ac44561 |
| import { reduceEvents } from "../projections/reducer"; | ||
|
|
||
| export class QueueProjector { | ||
| private readonly queue: ReadonlyArray<DomainEventEnvelope>[] = []; |
There was a problem hiding this comment.
queue declared as ReadonlyArray but mutated with push() and shift()
| private readonly queue: ReadonlyArray<DomainEventEnvelope>[] = []; | |
| private readonly queue: DomainEventEnvelope[][] = []; |
| const onTerminalEvent = (event: TerminalEvent) => { | ||
| const push: WsPush = { | ||
| broadcastPush({ | ||
| type: "push", | ||
| channel: WS_CHANNELS.terminalEvent, | ||
| data: event, | ||
| }; | ||
| const message = JSON.stringify(push); | ||
| let recipients = 0; | ||
| for (const client of clients) { | ||
| if (client.readyState === client.OPEN) { | ||
| client.send(message); | ||
| recipients += 1; | ||
| } | ||
| }); | ||
| if (event.type === "activity" || event.type === "exited" || event.type === "error") { | ||
| void coreRuntime.dispatch({ | ||
| id: crypto.randomUUID(), | ||
| type: "thread.setTerminalActivity", | ||
| issuedAt: event.createdAt, | ||
| payload: { | ||
| threadId: event.threadId, | ||
| terminalId: event.terminalId, | ||
| running: event.type === "activity" ? event.hasRunningSubprocess : false, | ||
| }, | ||
| }); | ||
| } | ||
| logOutgoingPush(push, recipients); | ||
| }; | ||
| terminalManager.on("event", onTerminalEvent); |
There was a problem hiding this comment.
terminal events dispatched twice - once in onTerminalEvent (line 133-142) and once via coreRuntime.bindTerminalEvents() (line 146). The bindTerminalEvents method is unused and should be removed, or this inline handler should be removed.
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
apps/web/src/components/ChatView.tsx (1)
1072-1094:⚠️ Potential issue | 🟡 MinorMissing error handling for
setRuntimeModeAPI call.The function dispatches the local state change optimistically before calling
api.state.setRuntimeMode(), but if the API call fails, the local state will be out of sync with the server. Consider either:
- Rolling back the local state on failure, or
- Wrapping the call in a try-catch to handle errors gracefully
Suggested improvement
const handleRuntimeModeChange = async (mode: "approval-required" | "full-access") => { if (mode === state.runtimeMode) return; dispatch({ type: "SET_RUNTIME_MODE", mode }); scheduleComposerFocus(); if (!api) return; - await api.state.setRuntimeMode({ mode }); + try { + await api.state.setRuntimeMode({ mode }); + } catch (err) { + // Rollback on failure + dispatch({ type: "SET_RUNTIME_MODE", mode: state.runtimeMode }); + return; + } const sessionIds = state.threads🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/web/src/components/ChatView.tsx` around lines 1072 - 1094, The optimistic dispatch in handleRuntimeModeChange currently calls api.state.setRuntimeMode without error handling, so if that API fails the local state (dispatch({ type: "SET_RUNTIME_MODE", mode })) will be inconsistent; wrap the api.state.setRuntimeMode call in a try-catch and on error dispatch a rollback (dispatch the previous runtime mode) and surface or log the error, ensuring you still call scheduleComposerFocus only after success or restore focus state appropriately; keep the existing session stop logic and setIsSwitchingRuntimeMode behavior but ensure setIsSwitchingRuntimeMode is only triggered after the API call succeeds (or is reverted on failure).apps/web/src/wsNativeApi.ts (1)
113-124:⚠️ Potential issue | 🟠 MajorProvider event stream is no longer consumed.
providers.onEventis a no-op, so the web app stops consuming theproviders.eventstream entirely. This breaks the required provider event flow.
As per coding guidelines: Web app must consume provider event streams via WebSocket push on channelproviders.event.🔧 Suggested fix
- onEvent: () => () => {}, + onEvent: (callback) => + transport.subscribe(WS_CHANNELS.providerEvent, callback as (data: unknown) => void),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/web/src/wsNativeApi.ts` around lines 113 - 124, providers.onEvent is currently a no-op, so the providers.event WebSocket stream isn't consumed; replace the no-op with an implementation that subscribes to the push channel (use WS_METHODS.providersEvent) and returns an unsubscribe function. Concretely, change the providers.onEvent implementation to accept a callback and call the transport subscription helper (e.g. transport.subscribe or the equivalent in this file) with WS_METHODS.providersEvent and that callback, then return the unsubscribe function from transport so the caller can stop listening.apps/web/src/components/Sidebar.tsx (2)
382-400:⚠️ Potential issue | 🟠 MajorNon-Electron project deletion does nothing after confirmation.
In the non-Electron branch (when
isElectronis false), the function confirms deletion with the user but then exits without calling any API to remove the project. Thedispatchcall was likely removed during migration but not replaced with an API call.🐛 Proposed fix to handle non-Electron project removal
if (isElectron) { try { await api.projects.remove({ id: projectId }); } catch (error) { const message = error instanceof Error ? error.message : "Unknown error deleting project."; console.error("Failed to remove project", { projectId, error }); toastManager.add({ type: "error", title: `Failed to delete "${project.name}"`, description: message, }); return; } + } else { + await api.projects.remove({ id: projectId }); } - },🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/web/src/components/Sidebar.tsx` around lines 382 - 400, The non-Electron branch after user confirmation currently does nothing; update the else (isElectron === false) path in the project deletion handler in Sidebar.tsx to call await api.projects.remove({ id: projectId }) (same as the Electron branch), wrap it in try/catch, log and toast errors using the existing toastManager pattern, and on success dispatch the existing state update (remove project from state.projects and cleanup state.threads) so the UI is updated; reference api.projects.remove, projectId, toastManager, dispatch, state.projects, and state.threads to locate and mirror the Electron-branch behavior.
142-170:⚠️ Potential issue | 🟠 MajorHandle the case when
apiis undefined before awaiting.The
handleNewThreadfunction awaitsapi?.state.createThread(...)at line 155. Ifapiis undefined, this evaluates toundefinedand the navigation proceeds without the thread being created on the server, leading to inconsistent state.🐛 Proposed fix to guard against undefined API
async ( projectId: string, options?: { branch?: string | null; worktreePath?: string | null; }, ) => { + if (!api) return; const thread = createThread(projectId, { model: state.projects.find((project) => project.id === projectId)?.model ?? DEFAULT_MODEL, branch: options?.branch ?? null, worktreePath: options?.worktreePath ?? null, }); - await api?.state.createThread({ + await api.state.createThread({ id: thread.id,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/web/src/components/Sidebar.tsx` around lines 142 - 170, The handler handleNewThread currently uses optional chaining on api when awaiting createThread, so if api is undefined the server call is skipped but navigation still proceeds; fix this by guarding early: check api is defined (e.g., if (!api) { handle error/notify user and return }) before calling api.state.createThread, then call await api.state.createThread(...) (no optional chaining) inside a try/catch and only call navigate after the awaited createThread succeeds; reference handleNewThread, api, api.state.createThread, and navigate when making the change.
🧹 Nitpick comments (12)
packages/contracts/src/ipc.ts (2)
45-45: Consolidate RuntimeMode to a single canonical type.RuntimeMode is now defined in multiple places; exporting it from contracts (or importing a shared definition) will prevent drift across IPC/UI/core.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/contracts/src/ipc.ts` at line 45, Define a single canonical RuntimeMode type in the contracts package by exporting the existing type alias RuntimeMode ("approval-required" | "full-access") from the contracts IPC module and then remove duplicate local definitions elsewhere; update other modules (IPC, UI, core) to import RuntimeMode from that exported symbol instead of redefining it so all consumers share the same exported RuntimeMode type.
144-158: Tighten the state API return types.
Promise<unknown>weakens the IPC contract and forces casting. Prefer explicit result types (e.g., AppViewState, ThreadView, or dedicated response interfaces) to keep the API self-describing and type-safe.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/contracts/src/ipc.ts` around lines 144 - 158, The state API currently returns Promise<unknown> for methods (getSnapshot, createThread, deleteThread, markThreadVisited, setRuntimeMode); replace those unknown return types with explicit, self-describing interfaces or types (e.g., AppViewState for getSnapshot, ThreadView or CreateThreadResponse for createThread, DeleteThreadResponse for deleteThread, MarkThreadVisitedResponse for markThreadVisited, and SetRuntimeModeResponse for setRuntimeMode) and update the signatures in the state block to use those concrete types so callers get proper typings and avoid casting.packages/core/src/domain/models.ts (2)
6-134: Introduce branded ID schemas instead of plain strings.Many entity IDs are
Schema.String, which makes cross-entity mixing easy. Consider defining branded ID schemas (ProjectId, ThreadId, SessionId, etc.) and reusing them across models to keep domain precision.Based on learnings: Applies to packages/{core,domain}//*.{ts,tsx} : Use branded types for IDs - AccountId, CompanyId, etc.; Applies to packages/{core,domain,persistence}//*.{ts,tsx} : Always use precise domain schemas - never lose type precision by using primitives when a richer domain type exists.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/core/src/domain/models.ts` around lines 6 - 134, Many schema fields use plain Schema.String for identifiers (e.g., id fields in ProjectScriptSchema, ProjectViewSchema, ProviderSessionViewSchema, ThreadViewSchema, ProviderEventSchema, TurnDiffSummarySchema, ThreadTerminalGroupSchema), which allows accidental cross-entity mixing; define branded ID schemas (e.g., ProjectIdSchema, ThreadIdSchema, SessionIdSchema, ThreadIdSchema, ProviderEventIdSchema) using Schema.Brand/Schema.Ref or a simple Schema.String.extend pattern and corresponding Type aliases (ProjectId, ThreadId, SessionId, etc.), then replace all Schema.String occurrences used as entity identifiers and related references (sessionId, projectId, threadId, turnId, itemId, assistantMessageId, activeTerminalId, activeTerminalGroupId, etc.) with the appropriate branded schemas and update the dependent types (ProjectView, ThreadView, ProviderSessionView, ProviderEvent, TurnDiffSummary, ThreadTerminalGroup, etc.) across the affected files so ID types are consistent and domain-precise.
6-22: Consider Schema.Class for domain models with constructors and equality semantics.Schema.Class provides constructor generation, Equal, and Hash implementations. For models like
ProjectScriptandProjectViewthat may benefit from these capabilities in the future, consider migrating fromSchema.StructtoSchema.Class.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/core/src/domain/models.ts` around lines 6 - 22, ProjectScriptSchema and ProjectViewSchema currently use Schema.Struct but should be migrated to Schema.Class to generate constructors and provide Equal/Hash semantics; replace Schema.Struct({...}) with Schema.Class(...) for ProjectScriptSchema and ProjectViewSchema, keep the same field definitions (id, name, command, keybinding, cwd, model, expanded, scripts) and ensure ProjectScript and ProjectView types remain derived via Schema.Schema.Type<typeof ProjectScriptSchema> and Schema.Schema.Type<typeof ProjectViewSchema>; after switching to Schema.Class, confirm any code that instantiates these models uses the generated constructors and that the scripts field still references ProjectScriptSchema (e.g., Schema.Array(ProjectScriptSchema)).packages/core/src/domain/events.ts (1)
28-108: Prefer branded ID schemas andSchema.Classfor domain payloads.Consider switching payload schemas to
Schema.Classand using branded ID schemas (if available in domain models) instead of rawSchema.Stringto preserve type precision.
Based on learnings: Always use precise domain schemas - never lose type precision by using primitives when a richer domain type exists; Use Schema.Class over Schema.Struct - classes provide constructor, Equal, Hash; Use branded types for IDs - AccountId, CompanyId, etc.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/core/src/domain/events.ts` around lines 28 - 108, Switch the payload schemas from Schema.Struct to Schema.Class and replace raw Schema.String IDs with the appropriate branded ID schemas (e.g., ProjectIdSchema, ThreadIdSchema, MessageIdSchema, etc.) to preserve domain type precision: update AppBootstrappedPayloadSchema, ProjectAddedPayloadSchema, ProjectRemovedPayloadSchema, ProjectScriptsUpdatedPayloadSchema, ThreadCreatedPayloadSchema, ThreadDeletedPayloadSchema, ThreadUserMessageAddedPayloadSchema, ThreadProviderSessionUpdatedPayloadSchema, ThreadProviderEventRecordedPayloadSchema, ThreadBranchSetPayloadSchema, ThreadTerminalActivitySetPayloadSchema, ThreadMarkVisitedPayloadSchema and RuntimeModeSetPayloadSchema to use Schema.Class(SomePayloadClass) and reference existing branded ID schemas and existing domain schemas (ProjectScriptSchema, ProviderSessionViewSchema, ProviderEventSchema, RuntimeModeSchema) instead of Schema.String for identifiers and other domain-typed fields. Ensure constructors/Equals/Hash are available by defining/using the payload classes and export them where necessary.apps/server/src/wsServer.ts (2)
126-146: Consider usingbindTerminalEventsfor consistency with provider events.Terminal events are dispatched to
coreRuntimeinline (lines 132-143), while provider events usecoreRuntime.bindProviderEvents(providerManager)at line 146. For consistency and to centralize the dispatch logic, consider adding a call tocoreRuntime.bindTerminalEvents(terminalManager)and removing the inline dispatch.♻️ Proposed refactor for consistency
const onTerminalEvent = (event: TerminalEvent) => { broadcastPush({ type: "push", channel: WS_CHANNELS.terminalEvent, data: event, }); - if (event.type === "activity" || event.type === "exited" || event.type === "error") { - void coreRuntime.dispatch({ - id: crypto.randomUUID(), - type: "thread.setTerminalActivity", - issuedAt: event.createdAt, - payload: { - threadId: event.threadId, - terminalId: event.terminalId, - running: event.type === "activity" ? event.hasRunningSubprocess : false, - }, - }); - } }; terminalManager.on("event", onTerminalEvent); coreRuntime.bindProviderEvents(providerManager); + coreRuntime.bindTerminalEvents(terminalManager);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/server/src/wsServer.ts` around lines 126 - 146, Replace the inline terminal-event dispatch logic with a centralized binding: remove the onTerminalEvent handler and the terminalManager.on("event", onTerminalEvent) registration (which contains the coreRuntime.dispatch call), and instead call coreRuntime.bindTerminalEvents(terminalManager) alongside coreRuntime.bindProviderEvents(providerManager) so terminal events are handled consistently by coreRuntime; ensure any needed payload shape (threadId, terminalId, running, issuedAt) matches what bindTerminalEvents expects.
295-303: Consider validating request params with Zod schemas from contracts.The current approach casts
request.paramsasRecord<string, unknown>and usesas neverthroughout (e.g., line 306). This bypasses type safety and could lead to runtime errors if clients send malformed requests.Since the coding guidelines specify using Zod schemas from
packages/contractsfor shared type contracts, consider defining and validating request param schemas for each method.As per coding guidelines: "Use Zod schemas from
packages/contractsfor shared type contracts covering provider events, WebSocket protocol, and model/session types."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/server/src/wsServer.ts` around lines 295 - 303, The code in routeRequest is unsafely casting request.params to Record<string, unknown> and later using `as never`; replace that with schema validation: import the appropriate Zod schemas from packages/contracts for the WebSocket request params you support, validate request.params at the start of routeRequest (before using paramsObj or findThreadBySessionId) and reject/return an error if validation fails; update callers to use the typed parsed output instead of `as never` so functions like routeRequest, the params handling logic, and any usages of findThreadBySessionId/coreRuntime.state consume the validated, correctly typed object rather than an unchecked cast.apps/web/src/components/Sidebar.tsx (2)
346-356: Remove unuseddispatchfromhandleThreadContextMenudependency array.Similar to
addProjectFromPath,dispatchis listed in the dependency array but is not used withinhandleThreadContextMenuafter the migration toapi.state.deleteThread.♻️ Proposed fix
[ api, appSettings.confirmThreadDelete, - dispatch, navigate, removeWorktreeMutation, routeThreadId, state.projects, state.threads, ],🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/web/src/components/Sidebar.tsx` around lines 346 - 356, The dependency array for the handleThreadContextMenu callback includes an unused dispatch reference; remove dispatch from that array so dependencies only list values actually used (api, appSettings.confirmThreadDelete, navigate, removeWorktreeMutation, routeThreadId, state.projects, state.threads). Open the handleThreadContextMenu definition and update its dependency array to drop dispatch (similar to the addProjectFromPath change) ensuring no other references to dispatch remain in the function body before committing.
224-232: Removedispatchfrom dependency array if no longer used inaddProjectFromPath.The
dispatchfunction is listed in the dependency array but doesn't appear to be called withinaddProjectFromPathafter the migration to API-based project/thread creation. This could be a leftover from the refactoring.♻️ Proposed fix to clean up dependency array
[ api, - dispatch, focusMostRecentThreadForProject, handleNewThread, isAddingProject, state.projects, ],🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/web/src/components/Sidebar.tsx` around lines 224 - 232, The dependency array for the effect that defines addProjectFromPath still includes dispatch even though addProjectFromPath no longer uses it; remove dispatch from that dependency array to avoid unnecessary re-renders and stale lint warnings. Locate the addProjectFromPath function and the surrounding React effect where the array is declared (references: addProjectFromPath, the dependency array containing api, dispatch, focusMostRecentThreadForProject, handleNewThread, isAddingProject, state.projects) and delete dispatch from the list, then run the linter/tests to ensure nothing else relies on dispatch in that scope.apps/server/src/coreRuntime.ts (1)
94-107: Consider caching sessionId-to-threadId mapping for high-frequency event ingestion.
ingestProviderEventfetches the full state and performs a linear search through all threads for every provider event. During active streaming, this could become a hot path.A simple Map cache (invalidated when sessions are bound/cleared) would reduce the overhead.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/server/src/coreRuntime.ts` around lines 94 - 107, ingestProviderEvent currently calls this.state() and linearly searches state.threads for matching session.sessionId on every event; replace that hot path with a cached Map from sessionId to threadId stored on the instance (e.g., this.sessionToThreadMap) and use it inside ingestProviderEvent to look up target threadId without scanning threads, then call dispatch with the found id; ensure the cache is updated/invalidated in the methods that bind or clear sessions (the same code paths that assign/clear thread.session.*, such as session-binding/clearing handlers) and fall back to the existing state()+scan if a sessionId is not in the cache so behavior remains correct.packages/core/src/projections/reducer.ts (2)
155-365: Consider using discriminated union types for event payloads instead of type assertions.The
applyDomainEventfunction usesastype assertions for every event payload (e.g., lines 158, 177, 194, etc.). This loses type safety and could lead to runtime errors if the payload structure changes.Since you're using Effect Schema elsewhere in the domain layer, consider defining a proper discriminated union for
DomainEventEnvelopewhere eachtypediscriminant carries its typed payload. This would enable TypeScript to narrow the payload type automatically in eachcasebranch.Based on learnings: "Always use precise domain schemas - never lose type precision by using primitives when a richer domain type exists."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/core/src/projections/reducer.ts` around lines 155 - 365, The switch in applyDomainEvent is using unsafe "as" assertions for each payload (e.g., in "app.bootstrapped", "project.added", "thread.created", "thread.providerEventRecorded", etc.); replace this by defining a discriminated union type for DomainEventEnvelope where the "type" literal discriminant maps to a properly typed payload for each event, update the DomainEventEnvelope import/definition so TypeScript narrows payload inside each case, and remove all payload "as" casts in applyDomainEvent (so branches like applyDomainEvent -> case "project.added", "thread.providerEventRecorded", "app.bootstrapped" will automatically get the correct typed payload) — adjust any dependent helper types/evolveSession/applyEventToMessages signatures as needed to accept the precise payload types.
116-120: Avoid unnecessary array allocation on no-op paths.The fallback return at line 119 creates a new array copy even when no mutation is needed. Since this function is called for every provider event during streaming, this adds allocation pressure.
♻️ Proposed fix to return the original array when unchanged
if (event.method === "turn/completed") { return previous.map((entry) => ({ ...entry, streaming: false })); } - return [...previous]; + return previous as ChatMessage[]; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/core/src/projections/reducer.ts` around lines 116 - 120, The fallback currently returns a shallow copy ([...previous]) causing unnecessary allocations; change the no-op path to return the original array (previous) instead so you only allocate when you actually map/update entries (e.g., in the branch handling event.method === "turn/completed"). Ensure callers do not mutate the returned array in-place before making this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/server/src/coreRuntime.ts`:
- Around line 54-58: The current fire-and-forget call in bindProviderEvents
silently swallows errors by using "void this.ingestProviderEvent(event)"; change
it to attach explicit error handling so ingestion failures are surfaced and
logged—e.g. call this.ingestProviderEvent(event).catch(err => /* log with
context */) and include contextual info (event, ProviderManager) in the message;
reference bindProviderEvents and ingestProviderEvent and use the class logger if
available (this.logger.error) or console.error as a fallback.
In `@apps/server/src/wsServer.ts`:
- Around line 656-660: The current shutdown uses Promise.race([stateStreamTask,
Promise.resolve()]) which always resolves immediately; replace that with either
a direct await of stateStreamTask for a graceful shutdown or implement a proper
timeout fallback using Promise.race([stateStreamTask, new Promise(resolve =>
setTimeout(resolve, TIMEOUT_MS))]) so coreRuntime.stop() waits for the iterator
to finish or the timeout to elapse; update the code around stateStreamTask and
the call to coreRuntime.stop() accordingly and choose a sensible TIMEOUT_MS
constant if using the timeout approach.
- Around line 586-605: The onListening callback currently uses a nested async
IIFE which can resolve the outer promise before startup failures are observed;
refactor so you await coreRuntime.start(cwd, projectName) directly in the
surrounding async function (or return the Promise from the async callback) and
only call resolve() after start succeeds, then launch the state stream
subscription as a separate background task (stateStreamTask) using
coreRuntime.subscribe() and for-await to call broadcastPush({ type: "push",
channel: WS_CHANNELS.stateUpdated, data: update.state }) while honoring
stateStreamStopped; ensure any error from coreRuntime.start is caught and
immediately passed to reject(error) so startup errors propagate correctly
instead of being swallowed by a nested IIFE.
In `@apps/web/src/routes/__root.tsx`:
- Around line 131-146: The getSnapshot() promise can arrive after a newer server
push and unvalidated cast bypasses contracts; update the effect to validate and
gate application: when calling api.state.getSnapshot() (and in the
onServerStateUpdate handler) parse the payload using the AppState Zod schema
from packages/contracts before dispatch, and keep a local ref (e.g.,
latestServerVersionRef or lastPositionRef) that you update whenever
onServerStateUpdate runs so that the awaited getSnapshot() is ignored if its
version/lastPosition is older than latestServerVersionRef; only dispatch({ type:
"SET_SERVER_STATE", state: parsed }) when the parsed payload validates and is
newer than the ref.
In `@apps/web/src/wsNativeApi.ts`:
- Around line 66-75: Validate the payload inside the transport.subscribe handler
for WS_CHANNELS.stateUpdated by importing and using the Zod schema (e.g.,
stateUpdatedSchema) from '@t3tools/contracts' to parse the incoming data before
updating lastStateSnapshot or calling stateListeners; use schema.safeParse(data)
and only set lastStateSnapshot / notify listeners when parsing succeeds,
otherwise log or ignore the invalid payload and do not call the listeners (keep
the transport.subscribe callback and variables lastStateSnapshot and
stateListeners as the targets to modify).
In `@packages/core/src/application/decide.ts`:
- Around line 71-72: Change the event type from the imperative
"thread.markVisited" to the past-tense "thread.markedVisited" for consistency:
update the case in decide.ts (the switch branch that returns
eventFromCommand(command, "thread.markVisited", ...)) to use
"thread.markedVisited", and then update the corresponding declaration(s) in
packages/core/src/domain/events.ts (and any other places that reference
"thread.markVisited", e.g., handlers, reducers, tests) to the new
"thread.markedVisited" string so all usages remain consistent.
In `@packages/core/src/application/engine.ts`:
- Around line 37-44: The execute method may return a stale AppViewState because
projector.enqueue(appended) is asynchronous and projection can finish later;
change the flow so execute awaits projection completion: update
Projector.enqueue to return a Promise that resolves when the appended events are
fully projected (or add a Projector.flush/completion method) and then in execute
(function execute(command: AnyDomainCommand)) await that promise after calling
eventStore.append(appended) before returning this.currentState(); ensure the
appended events are forwarded to the enqueue/flush call so the projection
completion corresponds to those events.
In `@packages/core/src/application/fanout.ts`:
- Around line 50-63: EffectFanout.subscribe currently adds a sink to the sinks
set but never removes it, leaking sinks when subscribers abandon their iterator;
modify subscribe to wrap/augment the AsyncIterable/AsyncIterator returned by
createSink<StateUpdatedNotification>() so that its return() and throw() methods
close the sink and remove it from this.sinks (and ensure any normal completion
also removes it), keeping publish() unchanged; reference EffectFanout,
subscribe, publish, sinks, and createSink to locate where to add iterator
cleanup logic.
In `@packages/core/src/application/projector.ts`:
- Around line 16-49: The drain loop can be started multiple times concurrently
via enqueue(); add an "in-flight drain" guard (e.g., a private property like
inFlightDrain: Promise<void> | null) so only one drain() runs at a time: have
drain() set inFlightDrain to its running Promise, clear inFlightDrain when the
loop finishes, and update enqueue() (and start()) to only call void this.drain()
if inFlightDrain is null; reference the existing methods drain(), enqueue(),
start(), and the processing/stopSignal flags when making the change.
---
Outside diff comments:
In `@apps/web/src/components/ChatView.tsx`:
- Around line 1072-1094: The optimistic dispatch in handleRuntimeModeChange
currently calls api.state.setRuntimeMode without error handling, so if that API
fails the local state (dispatch({ type: "SET_RUNTIME_MODE", mode })) will be
inconsistent; wrap the api.state.setRuntimeMode call in a try-catch and on error
dispatch a rollback (dispatch the previous runtime mode) and surface or log the
error, ensuring you still call scheduleComposerFocus only after success or
restore focus state appropriately; keep the existing session stop logic and
setIsSwitchingRuntimeMode behavior but ensure setIsSwitchingRuntimeMode is only
triggered after the API call succeeds (or is reverted on failure).
In `@apps/web/src/components/Sidebar.tsx`:
- Around line 382-400: The non-Electron branch after user confirmation currently
does nothing; update the else (isElectron === false) path in the project
deletion handler in Sidebar.tsx to call await api.projects.remove({ id:
projectId }) (same as the Electron branch), wrap it in try/catch, log and toast
errors using the existing toastManager pattern, and on success dispatch the
existing state update (remove project from state.projects and cleanup
state.threads) so the UI is updated; reference api.projects.remove, projectId,
toastManager, dispatch, state.projects, and state.threads to locate and mirror
the Electron-branch behavior.
- Around line 142-170: The handler handleNewThread currently uses optional
chaining on api when awaiting createThread, so if api is undefined the server
call is skipped but navigation still proceeds; fix this by guarding early: check
api is defined (e.g., if (!api) { handle error/notify user and return }) before
calling api.state.createThread, then call await api.state.createThread(...) (no
optional chaining) inside a try/catch and only call navigate after the awaited
createThread succeeds; reference handleNewThread, api, api.state.createThread,
and navigate when making the change.
In `@apps/web/src/wsNativeApi.ts`:
- Around line 113-124: providers.onEvent is currently a no-op, so the
providers.event WebSocket stream isn't consumed; replace the no-op with an
implementation that subscribes to the push channel (use
WS_METHODS.providersEvent) and returns an unsubscribe function. Concretely,
change the providers.onEvent implementation to accept a callback and call the
transport subscription helper (e.g. transport.subscribe or the equivalent in
this file) with WS_METHODS.providersEvent and that callback, then return the
unsubscribe function from transport so the caller can stop listening.
---
Nitpick comments:
In `@apps/server/src/coreRuntime.ts`:
- Around line 94-107: ingestProviderEvent currently calls this.state() and
linearly searches state.threads for matching session.sessionId on every event;
replace that hot path with a cached Map from sessionId to threadId stored on the
instance (e.g., this.sessionToThreadMap) and use it inside ingestProviderEvent
to look up target threadId without scanning threads, then call dispatch with the
found id; ensure the cache is updated/invalidated in the methods that bind or
clear sessions (the same code paths that assign/clear thread.session.*, such as
session-binding/clearing handlers) and fall back to the existing state()+scan if
a sessionId is not in the cache so behavior remains correct.
In `@apps/server/src/wsServer.ts`:
- Around line 126-146: Replace the inline terminal-event dispatch logic with a
centralized binding: remove the onTerminalEvent handler and the
terminalManager.on("event", onTerminalEvent) registration (which contains the
coreRuntime.dispatch call), and instead call
coreRuntime.bindTerminalEvents(terminalManager) alongside
coreRuntime.bindProviderEvents(providerManager) so terminal events are handled
consistently by coreRuntime; ensure any needed payload shape (threadId,
terminalId, running, issuedAt) matches what bindTerminalEvents expects.
- Around line 295-303: The code in routeRequest is unsafely casting
request.params to Record<string, unknown> and later using `as never`; replace
that with schema validation: import the appropriate Zod schemas from
packages/contracts for the WebSocket request params you support, validate
request.params at the start of routeRequest (before using paramsObj or
findThreadBySessionId) and reject/return an error if validation fails; update
callers to use the typed parsed output instead of `as never` so functions like
routeRequest, the params handling logic, and any usages of
findThreadBySessionId/coreRuntime.state consume the validated, correctly typed
object rather than an unchecked cast.
In `@apps/web/src/components/Sidebar.tsx`:
- Around line 346-356: The dependency array for the handleThreadContextMenu
callback includes an unused dispatch reference; remove dispatch from that array
so dependencies only list values actually used (api,
appSettings.confirmThreadDelete, navigate, removeWorktreeMutation,
routeThreadId, state.projects, state.threads). Open the handleThreadContextMenu
definition and update its dependency array to drop dispatch (similar to the
addProjectFromPath change) ensuring no other references to dispatch remain in
the function body before committing.
- Around line 224-232: The dependency array for the effect that defines
addProjectFromPath still includes dispatch even though addProjectFromPath no
longer uses it; remove dispatch from that dependency array to avoid unnecessary
re-renders and stale lint warnings. Locate the addProjectFromPath function and
the surrounding React effect where the array is declared (references:
addProjectFromPath, the dependency array containing api, dispatch,
focusMostRecentThreadForProject, handleNewThread, isAddingProject,
state.projects) and delete dispatch from the list, then run the linter/tests to
ensure nothing else relies on dispatch in that scope.
In `@packages/contracts/src/ipc.ts`:
- Line 45: Define a single canonical RuntimeMode type in the contracts package
by exporting the existing type alias RuntimeMode ("approval-required" |
"full-access") from the contracts IPC module and then remove duplicate local
definitions elsewhere; update other modules (IPC, UI, core) to import
RuntimeMode from that exported symbol instead of redefining it so all consumers
share the same exported RuntimeMode type.
- Around line 144-158: The state API currently returns Promise<unknown> for
methods (getSnapshot, createThread, deleteThread, markThreadVisited,
setRuntimeMode); replace those unknown return types with explicit,
self-describing interfaces or types (e.g., AppViewState for getSnapshot,
ThreadView or CreateThreadResponse for createThread, DeleteThreadResponse for
deleteThread, MarkThreadVisitedResponse for markThreadVisited, and
SetRuntimeModeResponse for setRuntimeMode) and update the signatures in the
state block to use those concrete types so callers get proper typings and avoid
casting.
In `@packages/core/src/domain/events.ts`:
- Around line 28-108: Switch the payload schemas from Schema.Struct to
Schema.Class and replace raw Schema.String IDs with the appropriate branded ID
schemas (e.g., ProjectIdSchema, ThreadIdSchema, MessageIdSchema, etc.) to
preserve domain type precision: update AppBootstrappedPayloadSchema,
ProjectAddedPayloadSchema, ProjectRemovedPayloadSchema,
ProjectScriptsUpdatedPayloadSchema, ThreadCreatedPayloadSchema,
ThreadDeletedPayloadSchema, ThreadUserMessageAddedPayloadSchema,
ThreadProviderSessionUpdatedPayloadSchema,
ThreadProviderEventRecordedPayloadSchema, ThreadBranchSetPayloadSchema,
ThreadTerminalActivitySetPayloadSchema, ThreadMarkVisitedPayloadSchema and
RuntimeModeSetPayloadSchema to use Schema.Class(SomePayloadClass) and reference
existing branded ID schemas and existing domain schemas (ProjectScriptSchema,
ProviderSessionViewSchema, ProviderEventSchema, RuntimeModeSchema) instead of
Schema.String for identifiers and other domain-typed fields. Ensure
constructors/Equals/Hash are available by defining/using the payload classes and
export them where necessary.
In `@packages/core/src/domain/models.ts`:
- Around line 6-134: Many schema fields use plain Schema.String for identifiers
(e.g., id fields in ProjectScriptSchema, ProjectViewSchema,
ProviderSessionViewSchema, ThreadViewSchema, ProviderEventSchema,
TurnDiffSummarySchema, ThreadTerminalGroupSchema), which allows accidental
cross-entity mixing; define branded ID schemas (e.g., ProjectIdSchema,
ThreadIdSchema, SessionIdSchema, ThreadIdSchema, ProviderEventIdSchema) using
Schema.Brand/Schema.Ref or a simple Schema.String.extend pattern and
corresponding Type aliases (ProjectId, ThreadId, SessionId, etc.), then replace
all Schema.String occurrences used as entity identifiers and related references
(sessionId, projectId, threadId, turnId, itemId, assistantMessageId,
activeTerminalId, activeTerminalGroupId, etc.) with the appropriate branded
schemas and update the dependent types (ProjectView, ThreadView,
ProviderSessionView, ProviderEvent, TurnDiffSummary, ThreadTerminalGroup, etc.)
across the affected files so ID types are consistent and domain-precise.
- Around line 6-22: ProjectScriptSchema and ProjectViewSchema currently use
Schema.Struct but should be migrated to Schema.Class to generate constructors
and provide Equal/Hash semantics; replace Schema.Struct({...}) with
Schema.Class(...) for ProjectScriptSchema and ProjectViewSchema, keep the same
field definitions (id, name, command, keybinding, cwd, model, expanded, scripts)
and ensure ProjectScript and ProjectView types remain derived via
Schema.Schema.Type<typeof ProjectScriptSchema> and Schema.Schema.Type<typeof
ProjectViewSchema>; after switching to Schema.Class, confirm any code that
instantiates these models uses the generated constructors and that the scripts
field still references ProjectScriptSchema (e.g.,
Schema.Array(ProjectScriptSchema)).
In `@packages/core/src/projections/reducer.ts`:
- Around line 155-365: The switch in applyDomainEvent is using unsafe "as"
assertions for each payload (e.g., in "app.bootstrapped", "project.added",
"thread.created", "thread.providerEventRecorded", etc.); replace this by
defining a discriminated union type for DomainEventEnvelope where the "type"
literal discriminant maps to a properly typed payload for each event, update the
DomainEventEnvelope import/definition so TypeScript narrows payload inside each
case, and remove all payload "as" casts in applyDomainEvent (so branches like
applyDomainEvent -> case "project.added", "thread.providerEventRecorded",
"app.bootstrapped" will automatically get the correct typed payload) — adjust
any dependent helper types/evolveSession/applyEventToMessages signatures as
needed to accept the precise payload types.
- Around line 116-120: The fallback currently returns a shallow copy
([...previous]) causing unnecessary allocations; change the no-op path to return
the original array (previous) instead so you only allocate when you actually
map/update entries (e.g., in the branch handling event.method ===
"turn/completed"). Ensure callers do not mutate the returned array in-place
before making this change.
| bindProviderEvents(providerManager: ProviderManager): void { | ||
| providerManager.on("event", (event: ProviderEvent) => { | ||
| void this.ingestProviderEvent(event); | ||
| }); | ||
| } |
There was a problem hiding this comment.
Unhandled errors in fire-and-forget provider event ingestion.
The void this.ingestProviderEvent(event) pattern silently swallows any errors that occur during event ingestion. Under load or during failures, this could lead to lost events without any indication.
Consider adding error handling to maintain predictable behavior during failures.
🛡️ Proposed fix to log ingestion errors
bindProviderEvents(providerManager: ProviderManager): void {
providerManager.on("event", (event: ProviderEvent) => {
- void this.ingestProviderEvent(event);
+ this.ingestProviderEvent(event).catch((err) => {
+ console.error("Failed to ingest provider event", { sessionId: event.sessionId, error: err });
+ });
});
}As per coding guidelines: "Maintain predictable behavior under load and during failures (session restarts, reconnects, partial streams)."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/server/src/coreRuntime.ts` around lines 54 - 58, The current
fire-and-forget call in bindProviderEvents silently swallows errors by using
"void this.ingestProviderEvent(event)"; change it to attach explicit error
handling so ingestion failures are surfaced and logged—e.g. call
this.ingestProviderEvent(event).catch(err => /* log with context */) and include
contextual info (event, ProviderManager) in the message; reference
bindProviderEvents and ingestProviderEvent and use the class logger if available
(this.logger.error) or console.error as a fallback.
| void (async () => { | ||
| try { | ||
| await coreRuntime.start(cwd, projectName); | ||
| stateStreamStopped = false; | ||
| stateStreamTask = (async () => { | ||
| const iterable = await coreRuntime.subscribe(); | ||
| for await (const update of iterable) { | ||
| if (stateStreamStopped) break; | ||
| broadcastPush({ | ||
| type: "push", | ||
| channel: WS_CHANNELS.stateUpdated, | ||
| data: update.state, | ||
| }); | ||
| } | ||
| })(); | ||
| resolve(); | ||
| } catch (error) { | ||
| reject(error as Error); | ||
| } | ||
| })(); |
There was a problem hiding this comment.
Avoid nested async IIFE in callback—handle errors consistently.
The onListening callback wraps an async IIFE that catches errors but calls reject(error as Error) after the outer Promise may have already resolved via the listener setup. If coreRuntime.start() fails, the reject might not propagate correctly to the caller.
Consider restructuring to ensure startup errors are always propagated.
🛡️ Proposed restructuring
const onListening = () => {
httpServer.off("error", onError);
- void (async () => {
- try {
- await coreRuntime.start(cwd, projectName);
- stateStreamStopped = false;
- stateStreamTask = (async () => {
- const iterable = await coreRuntime.subscribe();
- for await (const update of iterable) {
- if (stateStreamStopped) break;
- broadcastPush({
- type: "push",
- channel: WS_CHANNELS.stateUpdated,
- data: update.state,
- });
- }
- })();
- resolve();
- } catch (error) {
- reject(error as Error);
- }
- })();
+ coreRuntime.start(cwd, projectName)
+ .then(() => {
+ stateStreamStopped = false;
+ stateStreamTask = (async () => {
+ const iterable = await coreRuntime.subscribe();
+ for await (const update of iterable) {
+ if (stateStreamStopped) break;
+ broadcastPush({
+ type: "push",
+ channel: WS_CHANNELS.stateUpdated,
+ data: update.state,
+ });
+ }
+ })();
+ resolve();
+ })
+ .catch((error) => reject(error as Error));
};🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/server/src/wsServer.ts` around lines 586 - 605, The onListening callback
currently uses a nested async IIFE which can resolve the outer promise before
startup failures are observed; refactor so you await coreRuntime.start(cwd,
projectName) directly in the surrounding async function (or return the Promise
from the async callback) and only call resolve() after start succeeds, then
launch the state stream subscription as a separate background task
(stateStreamTask) using coreRuntime.subscribe() and for-await to call
broadcastPush({ type: "push", channel: WS_CHANNELS.stateUpdated, data:
update.state }) while honoring stateStreamStopped; ensure any error from
coreRuntime.start is caught and immediately passed to reject(error) so startup
errors propagate correctly instead of being swallowed by a nested IIFE.
| if (stateStreamTask) { | ||
| await Promise.race([stateStreamTask, Promise.resolve()]); | ||
| } | ||
| await coreRuntime.stop(); | ||
| await Promise.all([closeWebSocketServer, closeHttpServer]); |
There was a problem hiding this comment.
Promise.race with Promise.resolve() doesn't wait for stateStreamTask.
The shutdown sequence uses Promise.race([stateStreamTask, Promise.resolve()]) which always resolves immediately due to Promise.resolve(). This means stateStreamTask may still be running when coreRuntime.stop() is called, potentially causing issues if the iterator is still active.
If the intent is a timeout, use setTimeout. If the intent is graceful completion, await stateStreamTask directly (with a timeout fallback).
🛡️ Proposed fix with timeout
if (stateStreamTask) {
- await Promise.race([stateStreamTask, Promise.resolve()]);
+ const timeout = new Promise<void>((resolve) => setTimeout(resolve, 1000));
+ await Promise.race([stateStreamTask, timeout]);
}
await coreRuntime.stop();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (stateStreamTask) { | |
| await Promise.race([stateStreamTask, Promise.resolve()]); | |
| } | |
| await coreRuntime.stop(); | |
| await Promise.all([closeWebSocketServer, closeHttpServer]); | |
| if (stateStreamTask) { | |
| const timeout = new Promise<void>((resolve) => setTimeout(resolve, 1000)); | |
| await Promise.race([stateStreamTask, timeout]); | |
| } | |
| await coreRuntime.stop(); | |
| await Promise.all([closeWebSocketServer, closeHttpServer]); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/server/src/wsServer.ts` around lines 656 - 660, The current shutdown
uses Promise.race([stateStreamTask, Promise.resolve()]) which always resolves
immediately; replace that with either a direct await of stateStreamTask for a
graceful shutdown or implement a proper timeout fallback using
Promise.race([stateStreamTask, new Promise(resolve => setTimeout(resolve,
TIMEOUT_MS))]) so coreRuntime.stop() waits for the iterator to finish or the
timeout to elapse; update the code around stateStreamTask and the call to
coreRuntime.stop() accordingly and choose a sensible TIMEOUT_MS constant if
using the timeout approach.
| useEffect(() => { | ||
| if (!api) return; | ||
| return api.providers.onEvent((event) => { | ||
| if (event.method === "turn/completed") { | ||
| void invalidateGitQueries(queryClient); | ||
| } | ||
| if (event.method === "checkpoint/captured") { | ||
| const payload = event.payload as { turnCount?: number } | undefined; | ||
| const turnCount = payload?.turnCount; | ||
| void queryClient.invalidateQueries({ | ||
| queryKey: ["providers", "checkpointDiff"] as const, | ||
| predicate: (query) => { | ||
| if (typeof turnCount !== "number") return true; | ||
| return query.queryKey[5] === turnCount; | ||
| }, | ||
| }); | ||
| } | ||
| if (!activeThreadId) return; | ||
| dispatch({ | ||
| type: "APPLY_EVENT", | ||
| event, | ||
| activeAssistantItemRef, | ||
| activeThreadId, | ||
| }); | ||
| let mounted = true; | ||
| void api.state.getSnapshot().then((snapshot) => { | ||
| if (!mounted) return; | ||
| dispatch({ type: "SET_SERVER_STATE", state: snapshot as AppState }); | ||
| }); | ||
| }, [activeThreadId, api, dispatch, queryClient]); | ||
| return () => { | ||
| mounted = false; | ||
| }; | ||
| }, [api, dispatch]); | ||
|
|
||
| useEffect(() => { | ||
| if (!activeThreadId) return; | ||
| dispatch({ | ||
| type: "MARK_THREAD_VISITED", | ||
| threadId: activeThreadId, | ||
| visitedAt: new Date().toISOString(), | ||
| return onServerStateUpdate((snapshot) => { | ||
| dispatch({ type: "SET_SERVER_STATE", state: snapshot as AppState }); | ||
| }); |
There was a problem hiding this comment.
Prevent stale snapshot overwrites and validate payloads.
getSnapshot() can resolve after a newer state.updated push, which would overwrite fresher state. Also, snapshot as AppState bypasses the contracts schema. Please parse with the contracts schema and ignore snapshots older than the latest push (e.g., compare a lastPosition/version if available, or gate snapshot application once any push has arrived).
As per coding guidelines: Use Zod schemas from packages/contracts for shared type contracts covering provider events, WebSocket protocol, and model/session types.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/web/src/routes/__root.tsx` around lines 131 - 146, The getSnapshot()
promise can arrive after a newer server push and unvalidated cast bypasses
contracts; update the effect to validate and gate application: when calling
api.state.getSnapshot() (and in the onServerStateUpdate handler) parse the
payload using the AppState Zod schema from packages/contracts before dispatch,
and keep a local ref (e.g., latestServerVersionRef or lastPositionRef) that you
update whenever onServerStateUpdate runs so that the awaited getSnapshot() is
ignored if its version/lastPosition is older than latestServerVersionRef; only
dispatch({ type: "SET_SERVER_STATE", state: parsed }) when the parsed payload
validates and is newer than the ref.
| transport.subscribe(WS_CHANNELS.stateUpdated, (data) => { | ||
| lastStateSnapshot = data; | ||
| for (const listener of stateListeners) { | ||
| try { | ||
| listener(data); | ||
| } catch { | ||
| // Swallow listener errors | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
Validate state.updated payloads with contracts schemas before caching/dispatching.
The WS payload is currently treated as unknown and forwarded without schema validation. Please parse it with the Zod schema from @t3tools/contracts for the state.updated payload and only then update lastStateSnapshot / notify listeners.
As per coding guidelines: Use Zod schemas from packages/contracts for shared type contracts covering provider events, WebSocket protocol, and model/session types.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/web/src/wsNativeApi.ts` around lines 66 - 75, Validate the payload
inside the transport.subscribe handler for WS_CHANNELS.stateUpdated by importing
and using the Zod schema (e.g., stateUpdatedSchema) from '@t3tools/contracts' to
parse the incoming data before updating lastStateSnapshot or calling
stateListeners; use schema.safeParse(data) and only set lastStateSnapshot /
notify listeners when parsing succeeds, otherwise log or ignore the invalid
payload and do not call the listeners (keep the transport.subscribe callback and
variables lastStateSnapshot and stateListeners as the targets to modify).
| case "thread.markVisited": | ||
| return [eventFromCommand(command, "thread.markVisited", command.payload, streamIdFromCommand(command))]; |
There was a problem hiding this comment.
Event type naming inconsistency: should use past tense.
All other event types follow the past-tense convention (e.g., thread.created, thread.deleted, thread.branchSet), but thread.markVisited remains in imperative form. This should be thread.markedVisited for consistency.
Proposed fix
case "thread.markVisited":
- return [eventFromCommand(command, "thread.markVisited", command.payload, streamIdFromCommand(command))];
+ return [eventFromCommand(command, "thread.markedVisited", command.payload, streamIdFromCommand(command))];Also update the corresponding event type in packages/core/src/domain/events.ts if applicable.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case "thread.markVisited": | |
| return [eventFromCommand(command, "thread.markVisited", command.payload, streamIdFromCommand(command))]; | |
| case "thread.markVisited": | |
| return [eventFromCommand(command, "thread.markedVisited", command.payload, streamIdFromCommand(command))]; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/core/src/application/decide.ts` around lines 71 - 72, Change the
event type from the imperative "thread.markVisited" to the past-tense
"thread.markedVisited" for consistency: update the case in decide.ts (the switch
branch that returns eventFromCommand(command, "thread.markVisited", ...)) to use
"thread.markedVisited", and then update the corresponding declaration(s) in
packages/core/src/domain/events.ts (and any other places that reference
"thread.markVisited", e.g., handlers, reducers, tests) to the new
"thread.markedVisited" string so all usages remain consistent.
| async execute(command: AnyDomainCommand): Promise<AppViewState> { | ||
| const current = await this.currentState(); | ||
| const pending = decide(command, current); | ||
| if (pending.length === 0) return current; | ||
| const appended = await this.eventStore.append(pending); | ||
| await this.projector.enqueue(appended); | ||
| return this.currentState(); | ||
| } |
There was a problem hiding this comment.
execute() can return stale state.
enqueue() is async and projection happens later, but execute() immediately returns currentState(). This can resolve before the new events are projected. Consider awaiting a projector flush (or having enqueue() return a completion promise) before returning.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/core/src/application/engine.ts` around lines 37 - 44, The execute
method may return a stale AppViewState because projector.enqueue(appended) is
asynchronous and projection can finish later; change the flow so execute awaits
projection completion: update Projector.enqueue to return a Promise that
resolves when the appended events are fully projected (or add a
Projector.flush/completion method) and then in execute (function
execute(command: AnyDomainCommand)) await that promise after calling
eventStore.append(appended) before returning this.currentState(); ensure the
appended events are forwarded to the enqueue/flush call so the projection
completion corresponds to those events.
| export class EffectFanout implements FanoutPort { | ||
| private readonly sinks = new Set<Sink<StateUpdatedNotification>>(); | ||
|
|
||
| async publish(notification: StateUpdatedNotification): Promise<void> { | ||
| for (const sink of this.sinks) { | ||
| sink.push(notification); | ||
| } | ||
| } | ||
|
|
||
| async subscribe(): Promise<AsyncIterable<StateUpdatedNotification>> { | ||
| const sink = createSink<StateUpdatedNotification>(); | ||
| this.sinks.add(sink); | ||
| return sink.iterable; | ||
| } |
There was a problem hiding this comment.
Add unsubscribe/cleanup to prevent sink leaks.
subscribe() never removes sinks, so abandoned subscriptions accumulate forever and still receive pushes. Add iterator return()/throw() cleanup that closes the sink and removes it from this.sinks.
🔧 Suggested fix
async subscribe(): Promise<AsyncIterable<StateUpdatedNotification>> {
const sink = createSink<StateUpdatedNotification>();
this.sinks.add(sink);
- return sink.iterable;
+ return {
+ [Symbol.asyncIterator]: () => {
+ const iterator = sink.iterable[Symbol.asyncIterator]();
+ return {
+ next: () => iterator.next(),
+ return: async () => {
+ sink.close();
+ this.sinks.delete(sink);
+ return { value: undefined as never, done: true };
+ },
+ throw: async (err) => {
+ sink.close();
+ this.sinks.delete(sink);
+ throw err;
+ },
+ };
+ },
+ };
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/core/src/application/fanout.ts` around lines 50 - 63,
EffectFanout.subscribe currently adds a sink to the sinks set but never removes
it, leaking sinks when subscribers abandon their iterator; modify subscribe to
wrap/augment the AsyncIterable/AsyncIterator returned by
createSink<StateUpdatedNotification>() so that its return() and throw() methods
close the sink and remove it from this.sinks (and ensure any normal completion
also removes it), keeping publish() unchanged; reference EffectFanout,
subscribe, publish, sinks, and createSink to locate where to add iterator
cleanup logic.
| async start(): Promise<void> { | ||
| if (this.processing) return; | ||
| this.stopSignal = false; | ||
| this.processing = true; | ||
| void this.drain(); | ||
| } | ||
|
|
||
| async stop(): Promise<void> { | ||
| this.stopSignal = true; | ||
| this.processing = false; | ||
| } | ||
|
|
||
| async enqueue(events: ReadonlyArray<DomainEventEnvelope>): Promise<void> { | ||
| if (events.length === 0) return; | ||
| this.queue.push(events); | ||
| if (this.processing) { | ||
| void this.drain(); | ||
| } | ||
| } | ||
|
|
||
| async readState(): Promise<AppViewState> { | ||
| return (await this.projectionStore.readState()) ?? emptyAppViewState(); | ||
| } | ||
|
|
||
| private async drain(): Promise<void> { | ||
| if (!this.processing) return; | ||
| while (!this.stopSignal && this.queue.length > 0) { | ||
| const chunk = this.queue.shift(); | ||
| if (!chunk || chunk.length === 0) continue; | ||
| const current = (await this.projectionStore.readState()) ?? emptyAppViewState(); | ||
| const next = reduceEvents(chunk, current); | ||
| await this.projectionStore.writeState(next); | ||
| await this.fanout.publish({ state: next, events: chunk }); | ||
| } |
There was a problem hiding this comment.
Guard against concurrent drain() loops.
enqueue() can trigger multiple overlapping drain() executions, which can interleave and corrupt ordering. Add an in-flight drain guard (promise or flag) so only one drain runs at a time.
🔧 Suggested fix
export class QueueProjector {
private readonly queue: ReadonlyArray<DomainEventEnvelope>[] = [];
private processing = false;
private stopSignal = false;
+ private draining: Promise<void> | null = null;
@@
async enqueue(events: ReadonlyArray<DomainEventEnvelope>): Promise<void> {
if (events.length === 0) return;
this.queue.push(events);
if (this.processing) {
- void this.drain();
+ void this.drain();
}
}
@@
private async drain(): Promise<void> {
- if (!this.processing) return;
- while (!this.stopSignal && this.queue.length > 0) {
+ if (!this.processing) return;
+ if (this.draining) return this.draining;
+ this.draining = (async () => {
+ while (!this.stopSignal && this.queue.length > 0) {
const chunk = this.queue.shift();
if (!chunk || chunk.length === 0) continue;
const current = (await this.projectionStore.readState()) ?? emptyAppViewState();
const next = reduceEvents(chunk, current);
await this.projectionStore.writeState(next);
await this.fanout.publish({ state: next, events: chunk });
- }
+ }
+ })().finally(() => {
+ this.draining = null;
+ });
+ return this.draining;
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/core/src/application/projector.ts` around lines 16 - 49, The drain
loop can be started multiple times concurrently via enqueue(); add an "in-flight
drain" guard (e.g., a private property like inFlightDrain: Promise<void> | null)
so only one drain() runs at a time: have drain() set inFlightDrain to its
running Promise, clear inFlightDrain when the loop finishes, and update
enqueue() (and start()) to only call void this.drain() if inFlightDrain is null;
reference the existing methods drain(), enqueue(), start(), and the
processing/stopSignal flags when making the change.
| } | ||
| } | ||
|
|
||
| dispatch({ type: "DELETE_PROJECT", projectId }); |
There was a problem hiding this comment.
Project deletion broken in browser (non-Electron) mode
High Severity
In handleProjectContextMenu, the old dispatch({ type: "DELETE_PROJECT", projectId }) was removed without a replacement for the non-Electron code path. When isElectron is false, the api.projects.remove() call is inside the if (isElectron) guard, so it's skipped. The function prompts for confirmation and then does nothing — the project is never deleted either locally or on the server.
| id: result.project.id, | ||
| name: result.project.name, | ||
| cwd: result.project.cwd, | ||
| model: "gpt-5-codex", |
There was a problem hiding this comment.
Hardcoded model string doesn't match valid model options
Medium Severity
The model "gpt-5-codex" is hardcoded when adding a project and during bootstrap, but DEFAULT_MODEL is "gpt-5.3-codex" and "gpt-5-codex" is not in MODEL_OPTIONS. The server-side core package doesn't use resolveModelSlug, so this invalid model slug is stored as-is in the event store and projection, creating a mismatch with the client's model validation logic.
Additional Locations (1)
| running: event.type === "activity" ? event.hasRunningSubprocess : false, | ||
| }, | ||
| }); | ||
| }); |
There was a problem hiding this comment.
Unused bindTerminalEvents method is dead code
Low Severity
bindTerminalEvents is defined on CoreRuntime but never called anywhere. Terminal event dispatch is handled inline in onTerminalEvent within wsServer.ts. This creates confusing duplicate logic — one active (in wsServer) and one dead (in CoreRuntime) — that could diverge silently during future maintenance.
| return { | ||
| ...action.state, | ||
| threads: action.state.threads.map((thread) => normalizeThreadTerminals(thread)), | ||
| }; |
There was a problem hiding this comment.
SET_SERVER_STATE overwrites client-local optimistic state updates
Medium Severity
The SET_SERVER_STATE reducer replaces the entire client state via ...action.state, discarding any local-only optimistic updates (e.g., UPDATE_SESSION, PUSH_USER_MESSAGE, SET_ERROR, terminal toggling) that haven't yet been reflected in the server's projection. This causes visible UI regressions like user messages disappearing or session state reverting during active conversations.
| dispatch({ type: "SET_SERVER_STATE", state: snapshot as AppState }); | ||
| }); | ||
| }, [activeThreadId, dispatch]); | ||
| }, [dispatch]); |
There was a problem hiding this comment.
Git and checkpoint query caches never invalidated after turns
Medium Severity
The old EventRouter explicitly called invalidateGitQueries on turn/completed and invalidated checkpoint diff queries on checkpoint/captured provider events. These invalidations were removed along with the provider event subscription (api.providers.onEvent is now a no-op), with no replacement mechanism. After the AI completes a turn and modifies files, git status queries and checkpoint diff queries in React Query remain stale until manual refresh or window refocus.
| waiter({ value, done: false }); | ||
| return; | ||
| } | ||
| queue.push(value); |
There was a problem hiding this comment.
🟡 Medium application/fanout.ts:21
The queue array grows unboundedly if consumers are slower than producers. Consider adding a max queue size with backpressure (e.g., drop oldest, block, or error) to prevent memory exhaustion, or document this as intentional for expected usage patterns.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/core/src/application/fanout.ts around line 21:
The `queue` array grows unboundedly if consumers are slower than producers. Consider adding a max queue size with backpressure (e.g., drop oldest, block, or error) to prevent memory exhaustion, or document this as intentional for expected usage patterns.
Evidence trail:
packages/core/src/application/fanout.ts lines 11-21 (commit ac44561cafdcb8a14780fde9171b38ee9774eb40): Line 11 defines `const queue: T[] = []`, lines 14-21 show the `push` function that appends to `queue` without any size limit when no waiter is available.
| const pending = decide(command, current); | ||
| if (pending.length === 0) return current; | ||
| const appended = await this.eventStore.append(pending); | ||
| await this.projector.enqueue(appended); |
There was a problem hiding this comment.
🟠 High application/engine.ts:42
Projector work isn’t serialized/awaited: concurrent execute can start multiple drains and return before projections apply, so currentState() may be stale. Suggest a single‑flight QueueProjector (mutex/flag) and an enqueue that resolves when events are projected; execute should await it.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/core/src/application/engine.ts around line 42:
Projector work isn’t serialized/awaited: concurrent `execute` can start multiple `drain`s and return before projections apply, so `currentState()` may be stale. Suggest a single‑flight `QueueProjector` (mutex/flag) and an `enqueue` that resolves when events are projected; `execute` should await it.
Evidence trail:
packages/core/src/application/projector.ts lines 26-31: `enqueue` method uses `void this.drain()` which does not await the drain operation. packages/core/src/application/engine.ts lines 36-43: `execute` method awaits `enqueue` but since `enqueue` returns before drain completes, `currentState()` on line 42 may return stale state.
|
|
||
| async subscribe(): Promise<AsyncIterable<StateUpdatedNotification>> { | ||
| const sink = createSink<StateUpdatedNotification>(); | ||
| this.sinks.add(sink); |
There was a problem hiding this comment.
🟠 High application/fanout.ts:61
Sinks are never removed from this.sinks when consumers stop, which can leak memory. Suggest cleaning up by removing the sink when the async iterator finishes (implement return()), and/or exposing an unsubscribe mechanism.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/core/src/application/fanout.ts around line 61:
Sinks are never removed from `this.sinks` when consumers stop, which can leak memory. Suggest cleaning up by removing the sink when the async iterator finishes (implement `return()`), and/or exposing an unsubscribe mechanism.
| }); | ||
| } | ||
|
|
||
| async stop(): Promise<void> { |
There was a problem hiding this comment.
🟠 High src/coreRuntime.ts:37
Event listeners bound via bindProviderEvents and bindTerminalEvents are never removed in stop(). If events arrive after this.stores.db.close(), dispatch() will attempt to write to the closed database and crash. Consider storing the listener references and calling off() in stop().
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/server/src/coreRuntime.ts around line 37:
Event listeners bound via `bindProviderEvents` and `bindTerminalEvents` are never removed in `stop()`. If events arrive after `this.stores.db.close()`, `dispatch()` will attempt to write to the closed database and crash. Consider storing the listener references and calling `off()` in `stop()`.
Evidence trail:
apps/server/src/coreRuntime.ts lines 37-40 (stop() method only calls engine.stop() and db.close(), no listener removal), lines 54-58 (bindProviderEvents adds listener with .on()), lines 60-74 (bindTerminalEvents adds listener with .on() that calls this.dispatch()), lines 46-48 (dispatch() calls engine.execute()), line 24 (engine constructed with stores) at commit ac44561cafdcb8a14780fde9171b38ee9774eb40
| async enqueue(events: ReadonlyArray<DomainEventEnvelope>): Promise<void> { | ||
| if (events.length === 0) return; | ||
| this.queue.push(events); | ||
| if (this.processing) { |
There was a problem hiding this comment.
🟠 High application/projector.ts:31
Multiple concurrent drain() loops can race when enqueue is called repeatedly. Consider adding a guard (e.g., draining flag) to prevent spawning a new drain when one is already active.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/core/src/application/projector.ts around line 31:
Multiple concurrent `drain()` loops can race when `enqueue` is called repeatedly. Consider adding a guard (e.g., `draining` flag) to prevent spawning a new `drain` when one is already active.
Evidence trail:
packages/core/src/application/projector.ts lines 8-48 at commit ac44561cafdcb8a14780fde9171b38ee9774eb40: The `processing` flag (line 8) only indicates whether the projector is started. In `enqueue` (lines 28-33), `void this.drain()` is called whenever `processing` is true, with no guard to check if a drain is already running. The `drain()` method (lines 39-48) is async and can be interrupted at any await point, allowing multiple concurrent instances.
| }); | ||
| transport.subscribe(WS_CHANNELS.stateUpdated, (data) => { | ||
| lastStateSnapshot = data; | ||
| for (const listener of stateListeners) { |
There was a problem hiding this comment.
🟡 Medium src/wsNativeApi.ts:68
Iterating stateListeners while a listener adds new listeners causes duplicates—the new listener gets the replayed value and then the loop visits it again. Consider snapshotting the set before iterating (e.g., [...stateListeners]).
| for (const listener of stateListeners) { | |
| for (const listener of [...stateListeners]) { |
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/web/src/wsNativeApi.ts around line 68:
Iterating `stateListeners` while a listener adds new listeners causes duplicates—the new listener gets the replayed value and then the loop visits it again. Consider snapshotting the set before iterating (e.g., `[...stateListeners]`).
Evidence trail:
apps/web/src/wsNativeApi.ts lines 8, 34-46, 67-74 at commit ac44561cafdcb8a14780fde9171b38ee9774eb40. Line 8 shows `stateListeners` is a Set. Lines 34-46 show `onServerStateUpdate` adds listener to Set and replays `lastStateSnapshot`. Lines 67-74 show `lastStateSnapshot = data` is set before `for (const listener of stateListeners)` loop. ECMAScript Set iteration behavior: elements added during iteration are visited if added after current position.
|
Bugbot Autofix prepared fixes for 5 of the 5 bugs found in the latest run.
Or push these changes by commenting: Preview (8816ba5f81)diff --git a/apps/server/src/coreRuntime.ts b/apps/server/src/coreRuntime.ts
--- a/apps/server/src/coreRuntime.ts
+++ b/apps/server/src/coreRuntime.ts
@@ -1,11 +1,15 @@
import path from "node:path";
import crypto from "node:crypto";
-import type { ProviderEvent, ProviderSession, TerminalEvent } from "@t3tools/contracts";
-import { EffectFanout, OrchestrationEngine, QueueProjector, type AppViewState } from "@t3tools/core";
+import type { ProviderEvent, ProviderSession } from "@t3tools/contracts";
+import {
+ EffectFanout,
+ OrchestrationEngine,
+ QueueProjector,
+ type AppViewState,
+} from "@t3tools/core";
import { createSqliteStores } from "@t3tools/infra-sqlite";
import type { ProviderManager } from "./providerManager";
-import type { TerminalManager } from "./terminalManager";
function nowIso(): string {
return new Date().toISOString();
@@ -21,7 +25,11 @@
const dbPath = path.join(dbDir, "event-store.sqlite");
this.stores = createSqliteStores(dbPath);
this.projector = new QueueProjector(this.stores.projectionStore, this.fanout);
- this.engine = new OrchestrationEngine(this.stores.eventStore, this.stores.projectionStore, this.projector);
+ this.engine = new OrchestrationEngine(
+ this.stores.eventStore,
+ this.stores.projectionStore,
+ this.projector,
+ );
}
async start(cwd: string, projectName: string): Promise<void> {
@@ -57,22 +65,6 @@
});
}
- bindTerminalEvents(terminalManager: TerminalManager): void {
- terminalManager.on("event", (event: TerminalEvent) => {
- if (event.type !== "activity" && event.type !== "error" && event.type !== "exited") return;
- void this.dispatch({
- id: crypto.randomUUID(),
- type: "thread.setTerminalActivity",
- issuedAt: event.createdAt,
- payload: {
- threadId: event.threadId,
- terminalId: event.terminalId,
- running: event.type === "activity" ? event.hasRunningSubprocess : false,
- },
- });
- });
- }
-
async bindProviderSession(threadId: string, session: ProviderSession): Promise<void> {
await this.dispatch({
id: crypto.randomUUID(),
diff --git a/apps/server/src/wsServer.ts b/apps/server/src/wsServer.ts
--- a/apps/server/src/wsServer.ts
+++ b/apps/server/src/wsServer.ts
@@ -7,6 +7,7 @@
import type { Duplex } from "node:stream";
import {
+ DEFAULT_MODEL,
EDITORS,
WS_CHANNELS,
WS_METHODS,
@@ -304,7 +305,8 @@
switch (request.method) {
case WS_METHODS.providersStartSession: {
const session = await providerManager.startSession(request.params as never);
- const uiThreadId = typeof paramsObj.uiThreadId === "string" ? paramsObj.uiThreadId : undefined;
+ const uiThreadId =
+ typeof paramsObj.uiThreadId === "string" ? paramsObj.uiThreadId : undefined;
if (uiThreadId) {
await coreRuntime.bindProviderSession(uiThreadId, session);
}
@@ -313,8 +315,11 @@
case WS_METHODS.providersSendTurn: {
const sessionId = typeof paramsObj.sessionId === "string" ? paramsObj.sessionId : undefined;
- const uiThreadId = typeof paramsObj.uiThreadId === "string" ? paramsObj.uiThreadId : undefined;
- const targetThread = uiThreadId ? state.threads.find((thread) => thread.id === uiThreadId) : findThreadBySessionId(sessionId);
+ const uiThreadId =
+ typeof paramsObj.uiThreadId === "string" ? paramsObj.uiThreadId : undefined;
+ const targetThread = uiThreadId
+ ? state.threads.find((thread) => thread.id === uiThreadId)
+ : findThreadBySessionId(sessionId);
const inputText = typeof paramsObj.input === "string" ? paramsObj.input : undefined;
if (targetThread && inputText && inputText.trim().length > 0) {
await coreRuntime.dispatch({
@@ -363,52 +368,49 @@
case WS_METHODS.projectsList:
return projectRegistry.list();
- case WS_METHODS.projectsAdd:
- {
- const result = projectRegistry.add(request.params as never);
+ case WS_METHODS.projectsAdd: {
+ const result = projectRegistry.add(request.params as never);
+ await coreRuntime.dispatch({
+ id: crypto.randomUUID(),
+ type: "project.add",
+ issuedAt: requestNow,
+ payload: {
+ id: result.project.id,
+ name: result.project.name,
+ cwd: result.project.cwd,
+ model: DEFAULT_MODEL,
+ scripts: result.project.scripts,
+ },
+ });
+ return result;
+ }
+
+ case WS_METHODS.projectsRemove: {
+ const projectId = typeof paramsObj.id === "string" ? paramsObj.id : undefined;
+ if (projectId) {
await coreRuntime.dispatch({
id: crypto.randomUUID(),
- type: "project.add",
+ type: "project.remove",
issuedAt: requestNow,
- payload: {
- id: result.project.id,
- name: result.project.name,
- cwd: result.project.cwd,
- model: "gpt-5-codex",
- scripts: result.project.scripts,
- },
+ payload: { id: projectId },
});
- return result;
}
-
- case WS_METHODS.projectsRemove:
- {
- const projectId = typeof paramsObj.id === "string" ? paramsObj.id : undefined;
- if (projectId) {
- await coreRuntime.dispatch({
- id: crypto.randomUUID(),
- type: "project.remove",
- issuedAt: requestNow,
- payload: { id: projectId },
- });
- }
projectRegistry.remove(request.params as never);
return undefined;
- }
+ }
case WS_METHODS.projectsSearchEntries:
return searchWorkspaceEntries(request.params as never);
- case WS_METHODS.projectsUpdateScripts:
- {
- const result = projectRegistry.updateScripts(request.params as never);
- await coreRuntime.dispatch({
- id: crypto.randomUUID(),
- type: "project.updateScripts",
- issuedAt: requestNow,
- payload: { id: result.project.id, scripts: result.project.scripts },
- });
- return result;
- }
+ case WS_METHODS.projectsUpdateScripts: {
+ const result = projectRegistry.updateScripts(request.params as never);
+ await coreRuntime.dispatch({
+ id: crypto.randomUUID(),
+ type: "project.updateScripts",
+ issuedAt: requestNow,
+ payload: { id: result.project.id, scripts: result.project.scripts },
+ });
+ return result;
+ }
case WS_METHODS.shellOpenInEditor: {
const params = request.params as {
diff --git a/apps/web/src/components/Sidebar.tsx b/apps/web/src/components/Sidebar.tsx
--- a/apps/web/src/components/Sidebar.tsx
+++ b/apps/web/src/components/Sidebar.tsx
@@ -379,22 +379,18 @@
);
if (!confirmed) return;
- if (isElectron) {
- try {
- await api.projects.remove({ id: projectId });
- } catch (error) {
- const message =
- error instanceof Error ? error.message : "Unknown error deleting project.";
- console.error("Failed to remove project", { projectId, error });
- toastManager.add({
- type: "error",
- title: `Failed to delete "${project.name}"`,
- description: message,
- });
- return;
- }
+ try {
+ await api.projects.remove({ id: projectId });
+ } catch (error) {
+ const message = error instanceof Error ? error.message : "Unknown error deleting project.";
+ console.error("Failed to remove project", { projectId, error });
+ toastManager.add({
+ type: "error",
+ title: `Failed to delete "${project.name}"`,
+ description: message,
+ });
+ return;
}
-
},
[api, dispatch, state.projects, state.threads],
);
diff --git a/apps/web/src/routes/__root.tsx b/apps/web/src/routes/__root.tsx
--- a/apps/web/src/routes/__root.tsx
+++ b/apps/web/src/routes/__root.tsx
@@ -4,14 +4,16 @@
type ErrorComponentProps,
useParams,
} from "@tanstack/react-router";
-import { useEffect } from "react";
-import { QueryClient } from "@tanstack/react-query";
+import { useEffect, useRef } from "react";
+import { QueryClient, useQueryClient } from "@tanstack/react-query";
import { APP_DISPLAY_NAME } from "../branding";
import { Button } from "../components/ui/button";
import { AnchoredToastProvider, ToastProvider } from "../components/ui/toast";
import { isElectron } from "../env";
import { useNativeApi } from "../hooks/useNativeApi";
+import { invalidateGitQueries } from "../lib/gitReactQuery";
+import { providerQueryKeys } from "../lib/providerReactQuery";
import { type AppState, useStore } from "../store";
import { onServerStateUpdate, onServerWelcome } from "../wsNativeApi";
@@ -123,10 +125,12 @@
function EventRouter() {
const api = useNativeApi();
const { dispatch } = useStore();
+ const queryClient = useQueryClient();
const activeThreadId = useParams({
strict: false,
select: (params) => params.threadId,
});
+ const prevTurnCompletionsRef = useRef<Map<string, string | undefined>>(new Map());
useEffect(() => {
if (!api) return;
@@ -142,9 +146,28 @@
useEffect(() => {
return onServerStateUpdate((snapshot) => {
- dispatch({ type: "SET_SERVER_STATE", state: snapshot as AppState });
+ const serverState = snapshot as AppState;
+ dispatch({ type: "SET_SERVER_STATE", state: serverState });
+
+ let hasNewTurnCompletion = false;
+ for (const thread of serverState.threads) {
+ const prev = prevTurnCompletionsRef.current.get(thread.id);
+ if (thread.latestTurnCompletedAt && thread.latestTurnCompletedAt !== prev) {
+ hasNewTurnCompletion = true;
+ }
+ }
+ const nextMap = new Map<string, string | undefined>();
+ for (const thread of serverState.threads) {
+ nextMap.set(thread.id, thread.latestTurnCompletedAt);
+ }
+ prevTurnCompletionsRef.current = nextMap;
+
+ if (hasNewTurnCompletion) {
+ void invalidateGitQueries(queryClient);
+ void queryClient.invalidateQueries({ queryKey: providerQueryKeys.all });
+ }
});
- }, [dispatch]);
+ }, [dispatch, queryClient]);
useEffect(() => {
if (!activeThreadId) return;
diff --git a/apps/web/src/store.ts b/apps/web/src/store.ts
--- a/apps/web/src/store.ts
+++ b/apps/web/src/store.ts
@@ -127,7 +127,6 @@
.filter((id) => id.length > 0 && validTerminalIdSet.has(id));
}
-
function normalizeTerminalGroupIds(terminalIds: string[]): string[] {
return [...new Set(terminalIds.map((id) => id.trim()).filter((id) => id.length > 0))];
}
@@ -297,11 +296,26 @@
export function reducer(state: AppState, action: Action): AppState {
switch (action.type) {
- case "SET_SERVER_STATE":
+ case "SET_SERVER_STATE": {
+ const previousThreadById = new Map(
+ state.threads.map((thread) => [thread.id, thread] as const),
+ );
return {
...action.state,
- threads: action.state.threads.map((thread) => normalizeThreadTerminals(thread)),
+ threads: action.state.threads.map((thread) => {
+ const prev = previousThreadById.get(thread.id);
+ if (!prev) return normalizeThreadTerminals(thread);
+ return normalizeThreadTerminals({
+ ...thread,
+ terminalOpen: prev.terminalOpen,
+ terminalHeight: prev.terminalHeight,
+ activeTerminalId: prev.activeTerminalId,
+ terminalGroups: prev.terminalGroups,
+ activeTerminalGroupId: prev.activeTerminalGroupId,
+ });
+ }),
};
+ }
case "ADD_PROJECT":
if (state.projects.some((project) => project.cwd === action.project.cwd)) {
@@ -447,10 +461,7 @@
threads: updateThread(state.threads, action.threadId, (thread) => {
const normalizedThread = normalizeThreadTerminals(thread);
const isNewTerminal = !normalizedThread.terminalIds.includes(action.terminalId);
- if (
- isNewTerminal &&
- normalizedThread.terminalIds.length >= MAX_THREAD_TERMINAL_COUNT
- ) {
+ if (isNewTerminal && normalizedThread.terminalIds.length >= MAX_THREAD_TERMINAL_COUNT) {
return normalizedThread;
}
const terminalIds = normalizedThread.terminalIds.includes(action.terminalId)
@@ -520,10 +531,7 @@
threads: updateThread(state.threads, action.threadId, (thread) => {
const normalizedThread = normalizeThreadTerminals(thread);
const isNewTerminal = !normalizedThread.terminalIds.includes(action.terminalId);
- if (
- isNewTerminal &&
- normalizedThread.terminalIds.length >= MAX_THREAD_TERMINAL_COUNT
- ) {
+ if (isNewTerminal && normalizedThread.terminalIds.length >= MAX_THREAD_TERMINAL_COUNT) {
return normalizedThread;
}
const terminalIds = normalizedThread.terminalIds.includes(action.terminalId)
diff --git a/packages/core/src/projections/reducer.ts b/packages/core/src/projections/reducer.ts
--- a/packages/core/src/projections/reducer.ts
+++ b/packages/core/src/projections/reducer.ts
@@ -1,5 +1,11 @@
import type { DomainEventEnvelope } from "../domain/events";
-import type { AppViewState, ChatMessage, ProviderEvent, ProviderSessionView, ThreadView } from "../domain/models";
+import type {
+ AppViewState,
+ ChatMessage,
+ ProviderEvent,
+ ProviderSessionView,
+ ThreadView,
+} from "../domain/models";
import { emptyAppViewState } from "../domain/models";
function asObject(value: unknown): Record<string, unknown> | undefined {
@@ -27,7 +33,11 @@
const payload = asObject(event.payload);
if (event.method === "thread/started") {
const thread = asObject(payload?.thread);
- return { ...previous, threadId: asString(thread?.id) ?? event.threadId ?? previous.threadId, updatedAt: event.createdAt };
+ return {
+ ...previous,
+ threadId: asString(thread?.id) ?? event.threadId ?? previous.threadId,
+ updatedAt: event.createdAt,
+ };
}
if (event.method === "turn/started") {
const turn = asObject(payload?.turn);
@@ -70,7 +80,10 @@
return { ...previous, updatedAt: event.createdAt };
}
-function applyEventToMessages(previous: ReadonlyArray<ChatMessage>, event: ProviderEvent): ChatMessage[] {
+function applyEventToMessages(
+ previous: ReadonlyArray<ChatMessage>,
+ event: ProviderEvent,
+): ChatMessage[] {
const payload = asObject(event.payload);
if (event.method === "item/started") {
const item = asObject(payload?.item);
@@ -80,7 +93,13 @@
const seedText = asString(item?.text) ?? "";
return [
...previous.filter((entry) => entry.id !== itemId),
- { id: itemId, role: "assistant", text: seedText, createdAt: event.createdAt, streaming: true },
+ {
+ id: itemId,
+ role: "assistant",
+ text: seedText,
+ createdAt: event.createdAt,
+ streaming: true,
+ },
];
}
if (event.method === "item/agentMessage/delta") {
@@ -89,7 +108,10 @@
if (!itemId || !delta) return [...previous];
const idx = previous.findIndex((entry) => entry.id === itemId);
if (idx < 0) {
- return [...previous, { id: itemId, role: "assistant", text: delta, createdAt: event.createdAt, streaming: true }];
+ return [
+ ...previous,
+ { id: itemId, role: "assistant", text: delta, createdAt: event.createdAt, streaming: true },
+ ];
}
const next = [...previous];
const current = next[idx];
@@ -105,7 +127,16 @@
const fullText = asString(item?.text);
const idx = previous.findIndex((entry) => entry.id === itemId);
if (idx < 0) {
- return [...previous, { id: itemId, role: "assistant", text: fullText ?? "", createdAt: event.createdAt, streaming: false }];
+ return [
+ ...previous,
+ {
+ id: itemId,
+ role: "assistant",
+ text: fullText ?? "",
+ createdAt: event.createdAt,
+ streaming: false,
+ },
+ ];
}
const next = [...previous];
const current = next[idx];
@@ -156,7 +187,8 @@
switch (event.type) {
case "app.bootstrapped": {
const payload = event.payload as { cwd: string; projectName: string };
- if (state.projects.length > 0) return { ...state, threadsHydrated: true, lastPosition: event.position };
+ if (state.projects.length > 0)
+ return { ...state, threadsHydrated: true, lastPosition: event.position };
return {
...state,
projects: [
@@ -164,7 +196,7 @@
id: "bootstrap-project",
name: payload.projectName,
cwd: payload.cwd,
- model: "gpt-5-codex",
+ model: "gpt-5.3-codex",
expanded: true,
scripts: [],
},
@@ -181,7 +213,9 @@
model: string;
scripts: Array<{ id: string; name: string; command: string; keybinding?: string }>;
};
- if (state.projects.some((project) => project.id === payload.id || project.cwd === payload.cwd)) {
+ if (
+ state.projects.some((project) => project.id === payload.id || project.cwd === payload.cwd)
+ ) {
return { ...state, lastPosition: event.position };
}
return {
@@ -225,11 +259,19 @@
if (state.threads.some((thread) => thread.id === payload.id)) {
return { ...state, lastPosition: event.position };
}
- return { ...state, threads: [...state.threads, defaultThread(payload)], lastPosition: event.position };
+ return {
+ ...state,
+ threads: [...state.threads, defaultThread(payload)],
+ lastPosition: event.position,
+ };
}
case "thread.deleted": {
const payload = event.payload as { id: string };
- return { ...state, threads: state.threads.filter((thread) => thread.id !== payload.id), lastPosition: event.position };
+ return {
+ ...state,
+ threads: state.threads.filter((thread) => thread.id !== payload.id),
+ lastPosition: event.position,
+ };
}
case "thread.userMessageAdded": {
const payload = event.payload as {
@@ -293,11 +335,22 @@
threads: state.threads.map((thread) => {
if (thread.id !== payload.threadId) return thread;
const nextEvents = [payload.event, ...thread.events];
- const nextSession = thread.session ? evolveSession(thread.session, payload.event) : thread.session;
+ const nextSession = thread.session
+ ? evolveSession(thread.session, payload.event)
+ : thread.session;
const nextMessages = applyEventToMessages(thread.messages, payload.event);
- const nextTurnId = payload.event.method === "turn/started" ? eventTurnId(payload.event) ?? thread.latestTurnId : thread.latestTurnId;
- const nextTurnStartedAt = payload.event.method === "turn/started" ? payload.event.createdAt : thread.latestTurnStartedAt;
- const nextTurnCompletedAt = payload.event.method === "turn/completed" ? payload.event.createdAt : thread.latestTurnCompletedAt;
+ const nextTurnId =
+ payload.event.method === "turn/started"
+ ? (eventTurnId(payload.event) ?? thread.latestTurnId)
+ : thread.latestTurnId;
+ const nextTurnStartedAt =
+ payload.event.method === "turn/started"
+ ? payload.event.createdAt
+ : thread.latestTurnStartedAt;
+ const nextTurnCompletedAt =
+ payload.event.method === "turn/completed"
+ ? payload.event.createdAt
+ : thread.latestTurnCompletedAt;
const nextTurnDuration =
payload.event.method === "turn/completed" && thread.latestTurnStartedAt
? durationMs(thread.latestTurnStartedAt, payload.event.createdAt)
@@ -307,7 +360,10 @@
events: nextEvents,
messages: nextMessages,
session: nextSession,
- error: payload.event.kind === "error" && payload.event.message ? payload.event.message : thread.error,
+ error:
+ payload.event.kind === "error" && payload.event.message
+ ? payload.event.message
+ : thread.error,
latestTurnId: nextTurnId,
latestTurnStartedAt: nextTurnStartedAt,
latestTurnCompletedAt: nextTurnCompletedAt,
@@ -318,7 +374,11 @@
};
}
case "thread.branchSet": {
- const payload = event.payload as { threadId: string; branch: string | null; worktreePath: string | null };
+ const payload = event.payload as {
+ threadId: string;
+ branch: string | null;
+ worktreePath: string | null;
+ };
return {
...state,
threads: state.threads.map((thread) =>
@@ -365,6 +425,12 @@
}
}
-export function reduceEvents(events: ReadonlyArray<DomainEventEnvelope>, seed?: AppViewState): AppViewState {
- return events.reduce((state, event) => applyDomainEvent(state, event), seed ?? emptyAppViewState());
+export function reduceEvents(
+ events: ReadonlyArray<DomainEventEnvelope>,
+ seed?: AppViewState,
+): AppViewState {
+ return events.reduce(
+ (state, event) => applyDomainEvent(state, event),
+ seed ?? emptyAppViewState(),
+ );
} |



Summary
@t3tools/core(Effect-based domain + orchestration + projections).@t3tools/infra-sqlite(SQLite event store + projection state).state.updatedsnapshots to clients.Core Architecture
Service Modules and Responsibilities
apps/server/src/wsServer.tsTransport adapter for WebSocket RPC/push. Routes
state.*commands/queries to runtime, pushesstate.updated, and bridges non-state infra calls.apps/server/src/coreRuntime.tsServer composition root for core data flow. Boots engine, binds provider/terminal runtime signals, dispatches domain commands, exposes snapshot/subscription.
packages/core/src/domain/models.tsCanonical schemas/read models (
AppViewState, thread/project/session/message models).packages/core/src/domain/commands.tsCommand contracts for state mutations (
project.*,thread.*,runtime.*, bootstrap).packages/core/src/domain/events.tsAppend-only domain event envelope + typed event taxonomy.
packages/core/src/application/decide.tsDeterministic command-to-event decision logic.
packages/core/src/application/engine.tsOrchestration engine: load state, decide events, append to store, enqueue projection updates.
packages/core/src/application/projector.tsDeterministic projector pipeline from events to materialized
AppViewState.packages/core/src/application/fanout.tsIn-process pub/sub fanout for projection updates (
state.updated).packages/core/src/projections/reducer.tsPure reducer for applying domain events to projection state.
packages/infra-sqlite/src/sqliteStore.tsSQLite persistence: append-only events table + persisted projection state table.
apps/server/src/providerManager.tsandapps/server/src/terminalManager.tsRuntime integration adapters (provider/PTY execution). They emit runtime signals that are persisted via core commands rather than owning client state.
apps/web/src/wsNativeApi.tsThin client transport facade; subscribes to authoritative
state.updatedand issues explicit commands.apps/web/src/store.tsLightweight UI state container hydrated from server snapshots (not localStorage-derived authority).
Core Data Flow
state.*, provider action, terminal action).AppViewState.state.updatedto connected clients.Test plan
state.updated.