-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Move persistence to server-side with Effect and SQLite #88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| import path from "node:path"; | ||
| import crypto from "node:crypto"; | ||
| import type { ProviderEvent, ProviderSession, TerminalEvent } from "@t3tools/contracts"; | ||
| import { EffectFanout, OrchestrationEngine, QueueProjector, type AppViewState } from "@t3tools/core"; | ||
| import { createSqliteStores } from "@t3tools/infra-sqlite"; | ||
|
|
||
| import type { ProviderManager } from "./providerManager"; | ||
| import type { TerminalManager } from "./terminalManager"; | ||
|
|
||
| function nowIso(): string { | ||
| return new Date().toISOString(); | ||
| } | ||
|
|
||
| export class CoreRuntime { | ||
| private readonly fanout = new EffectFanout(); | ||
| private readonly stores; | ||
| private readonly projector: QueueProjector; | ||
| private readonly engine: OrchestrationEngine; | ||
|
|
||
| constructor(dbDir: string) { | ||
| const dbPath = path.join(dbDir, "event-store.sqlite"); | ||
| this.stores = createSqliteStores(dbPath); | ||
| this.projector = new QueueProjector(this.stores.projectionStore, this.fanout); | ||
| this.engine = new OrchestrationEngine(this.stores.eventStore, this.stores.projectionStore, this.projector); | ||
| } | ||
|
|
||
| async start(cwd: string, projectName: string): Promise<void> { | ||
| await this.engine.start(); | ||
| await this.engine.execute({ | ||
| id: crypto.randomUUID(), | ||
| type: "app.bootstrap", | ||
| issuedAt: nowIso(), | ||
| payload: { cwd, projectName }, | ||
| }); | ||
| } | ||
|
|
||
| async stop(): Promise<void> { | ||
| await this.engine.stop(); | ||
| this.stores.db.close(); | ||
| } | ||
|
|
||
| async state(): Promise<AppViewState> { | ||
| return this.engine.currentState(); | ||
| } | ||
|
|
||
| async dispatch(command: Parameters<OrchestrationEngine["execute"]>[0]): Promise<AppViewState> { | ||
| return this.engine.execute(command); | ||
| } | ||
|
|
||
| subscribe() { | ||
| return this.fanout.subscribe(); | ||
| } | ||
|
|
||
| bindProviderEvents(providerManager: ProviderManager): void { | ||
| providerManager.on("event", (event: ProviderEvent) => { | ||
| void this.ingestProviderEvent(event); | ||
| }); | ||
| } | ||
|
Comment on lines
+54
to
+58
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unhandled errors in fire-and-forget provider event ingestion. The Consider adding error handling to maintain predictable behavior during failures. 🛡️ Proposed fix to log ingestion errors bindProviderEvents(providerManager: ProviderManager): void {
providerManager.on("event", (event: ProviderEvent) => {
- void this.ingestProviderEvent(event);
+ this.ingestProviderEvent(event).catch((err) => {
+ console.error("Failed to ingest provider event", { sessionId: event.sessionId, error: err });
+ });
});
}As per coding guidelines: "Maintain predictable behavior under load and during failures (session restarts, reconnects, partial streams)." 🤖 Prompt for AI Agents |
||
|
|
||
| bindTerminalEvents(terminalManager: TerminalManager): void { | ||
| terminalManager.on("event", (event: TerminalEvent) => { | ||
| if (event.type !== "activity" && event.type !== "error" && event.type !== "exited") return; | ||
| void this.dispatch({ | ||
| id: crypto.randomUUID(), | ||
| type: "thread.setTerminalActivity", | ||
| issuedAt: event.createdAt, | ||
| payload: { | ||
| threadId: event.threadId, | ||
| terminalId: event.terminalId, | ||
| running: event.type === "activity" ? event.hasRunningSubprocess : false, | ||
| }, | ||
| }); | ||
| }); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unused
|
||
| } | ||
|
|
||
| async bindProviderSession(threadId: string, session: ProviderSession): Promise<void> { | ||
| await this.dispatch({ | ||
| id: crypto.randomUUID(), | ||
| type: "thread.updateProviderSession", | ||
| issuedAt: nowIso(), | ||
| payload: { threadId, session }, | ||
| }); | ||
| } | ||
|
|
||
| async clearProviderSession(threadId: string): Promise<void> { | ||
| await this.dispatch({ | ||
| id: crypto.randomUUID(), | ||
| type: "thread.updateProviderSession", | ||
| issuedAt: nowIso(), | ||
| payload: { threadId, session: null }, | ||
| }); | ||
| } | ||
|
|
||
| async ingestProviderEvent(event: ProviderEvent): Promise<void> { | ||
| const state = await this.state(); | ||
| const target = state.threads.find((thread) => thread.session?.sessionId === event.sessionId); | ||
| if (!target) return; | ||
| await this.dispatch({ | ||
| id: crypto.randomUUID(), | ||
| type: "thread.recordProviderEvent", | ||
| issuedAt: event.createdAt, | ||
| payload: { | ||
| threadId: target.id, | ||
| event, | ||
| }, | ||
| }); | ||
| } | ||
| } | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟠 High
src/coreRuntime.ts:37Event listeners bound via
bindProviderEventsandbindTerminalEventsare never removed instop(). If events arrive afterthis.stores.db.close(),dispatch()will attempt to write to the closed database and crash. Consider storing the listener references and callingoff()instop().🚀 Reply "fix it for me" or copy this AI Prompt for your agent: