From 9b8dbeda78ebec79e5d1ee4e0bc6711a1e569cda Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 30 Mar 2026 17:59:19 -0400 Subject: [PATCH] refactor(session): effectify SessionSummary service Convert SessionSummary to Effect service pattern with Interface, Service, Layer, and async facades. Use Session.Service and Bus.Service directly instead of facades. --- .../opencode/src/server/routes/session.ts | 4 +- packages/opencode/src/session/processor.ts | 10 +- packages/opencode/src/session/summary.ts | 208 ++++++++++-------- 3 files changed, 118 insertions(+), 104 deletions(-) diff --git a/packages/opencode/src/server/routes/session.ts b/packages/opencode/src/server/routes/session.ts index 23615d39abcc..c33c5e989b37 100644 --- a/packages/opencode/src/server/routes/session.ts +++ b/packages/opencode/src/server/routes/session.ts @@ -436,13 +436,13 @@ export const SessionRoutes = lazy(() => validator( "param", z.object({ - sessionID: SessionSummary.diff.schema.shape.sessionID, + sessionID: SessionSummary.DiffInput.shape.sessionID, }), ), validator( "query", z.object({ - messageID: SessionSummary.diff.schema.shape.messageID, + messageID: SessionSummary.DiffInput.shape.messageID, }), ), async (c) => { diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index b632a61a18e8..2482e40fb346 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -294,12 +294,10 @@ export namespace SessionProcessor { } ctx.snapshot = undefined } - yield* Effect.promise(() => - SessionSummary.summarize({ - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.parentID, - }), - ).pipe(Effect.ignoreCause({ log: true, message: "session summary failed" }), Effect.forkDetach) + SessionSummary.summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }) if ( !ctx.assistantMessage.summary && isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) diff --git a/packages/opencode/src/session/summary.ts b/packages/opencode/src/session/summary.ts index c65cb9d0e00a..dbde91214300 100644 --- a/packages/opencode/src/session/summary.ts +++ b/packages/opencode/src/session/summary.ts @@ -1,14 +1,12 @@ -import { fn } from "@/util/fn" import z from "zod" +import { Effect, Layer, ServiceMap } from "effect" +import { makeRuntime } from "@/effect/run-service" +import { Bus } from "@/bus" +import { Snapshot } from "@/snapshot" +import { Storage } from "@/storage/storage" import { Session } from "." - import { MessageV2 } from "./message-v2" import { SessionID, MessageID } from "./schema" -import { Snapshot } from "@/snapshot" - -import { Storage } from "@/storage/storage" -import { Bus } from "@/bus" -import { NotFoundError } from "@/storage/db" export namespace SessionSummary { function unquoteGitPath(input: string) { @@ -67,103 +65,121 @@ export namespace SessionSummary { return Buffer.from(bytes).toString() } - export const summarize = fn( - z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod, - }), - async (input) => { - await Session.messages({ sessionID: input.sessionID }) - .then((all) => - Promise.all([ - summarizeSession({ sessionID: input.sessionID, messages: all }), - summarizeMessage({ messageID: input.messageID, messages: all }), - ]), - ) - .catch((err) => { - if (NotFoundError.isInstance(err)) return - throw err + export interface Interface { + readonly summarize: (input: { sessionID: SessionID; messageID: MessageID }) => Effect.Effect + readonly diff: (input: { sessionID: SessionID; messageID?: MessageID }) => Effect.Effect + readonly computeDiff: (input: { messages: MessageV2.WithParts[] }) => Effect.Effect + } + + export class Service extends ServiceMap.Service()("@opencode/SessionSummary") {} + + export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const sessions = yield* Session.Service + const snapshot = yield* Snapshot.Service + const storage = yield* Storage.Service + const bus = yield* Bus.Service + + const computeDiff = Effect.fn("SessionSummary.computeDiff")(function* (input: { + messages: MessageV2.WithParts[] + }) { + let from: string | undefined + let to: string | undefined + for (const item of input.messages) { + if (!from) { + for (const part of item.parts) { + if (part.type === "step-start" && part.snapshot) { + from = part.snapshot + break + } + } + } + for (const part of item.parts) { + if (part.type === "step-finish" && part.snapshot) to = part.snapshot + } + } + if (from && to) return yield* snapshot.diffFull(from, to) + return [] + }) + + const summarize = Effect.fn("SessionSummary.summarize")(function* (input: { + sessionID: SessionID + messageID: MessageID + }) { + const all = yield* sessions.messages({ sessionID: input.sessionID }) + if (!all.length) return + + const diffs = yield* computeDiff({ messages: all }) + yield* sessions.setSummary({ + sessionID: input.sessionID, + summary: { + additions: diffs.reduce((sum, x) => sum + x.additions, 0), + deletions: diffs.reduce((sum, x) => sum + x.deletions, 0), + files: diffs.length, + }, }) - }, - ) + yield* storage.write(["session_diff", input.sessionID], diffs).pipe(Effect.ignore) + yield* bus.publish(Session.Event.Diff, { sessionID: input.sessionID, diff: diffs }) - async function summarizeSession(input: { sessionID: SessionID; messages: MessageV2.WithParts[] }) { - const diffs = await computeDiff({ messages: input.messages }) - await Session.setSummary({ - sessionID: input.sessionID, - summary: { - additions: diffs.reduce((sum, x) => sum + x.additions, 0), - deletions: diffs.reduce((sum, x) => sum + x.deletions, 0), - files: diffs.length, - }, - }) - await Storage.write(["session_diff", input.sessionID], diffs) - Bus.publish(Session.Event.Diff, { - sessionID: input.sessionID, - diff: diffs, - }) - } + const messages = all.filter( + (m) => + m.info.id === input.messageID || (m.info.role === "assistant" && m.info.parentID === input.messageID), + ) + const target = messages.find((m) => m.info.id === input.messageID) + if (!target || target.info.role !== "user") return + const msgDiffs = yield* computeDiff({ messages }) + target.info.summary = { ...target.info.summary, diffs: msgDiffs } + yield* sessions.updateMessage(target.info) + }) - async function summarizeMessage(input: { messageID: string; messages: MessageV2.WithParts[] }) { - const messages = input.messages.filter( - (m) => m.info.id === input.messageID || (m.info.role === "assistant" && m.info.parentID === input.messageID), - ) - const msgWithParts = messages.find((m) => m.info.id === input.messageID) - if (!msgWithParts || msgWithParts.info.role !== "user") return - const userMsg = msgWithParts.info - const diffs = await computeDiff({ messages }) - userMsg.summary = { - ...userMsg.summary, - diffs, - } - await Session.updateMessage(userMsg) - } + const diff = Effect.fn("SessionSummary.diff")(function* (input: { + sessionID: SessionID + messageID?: MessageID + }) { + const diffs = yield* storage.read(["session_diff", input.sessionID]).pipe( + Effect.catch(() => Effect.succeed([] as Snapshot.FileDiff[])), + ) + const next = diffs.map((item) => { + const file = unquoteGitPath(item.file) + if (file === item.file) return item + return { ...item, file } + }) + const changed = next.some((item, i) => item.file !== diffs[i]?.file) + if (changed) yield* storage.write(["session_diff", input.sessionID], next).pipe(Effect.ignore) + return next + }) - export const diff = fn( - z.object({ - sessionID: SessionID.zod, - messageID: MessageID.zod.optional(), + return Service.of({ summarize, diff, computeDiff }) }), - async (input) => { - const diffs = await Storage.read(["session_diff", input.sessionID]).catch(() => []) - const next = diffs.map((item) => { - const file = unquoteGitPath(item.file) - if (file === item.file) return item - return { - ...item, - file, - } - }) - const changed = next.some((item, i) => item.file !== diffs[i]?.file) - if (changed) Storage.write(["session_diff", input.sessionID], next).catch(() => {}) - return next - }, ) - export async function computeDiff(input: { messages: MessageV2.WithParts[] }) { - let from: string | undefined - let to: string | undefined - - // scan assistant messages to find earliest from and latest to - // snapshot - for (const item of input.messages) { - if (!from) { - for (const part of item.parts) { - if (part.type === "step-start" && part.snapshot) { - from = part.snapshot - break - } - } - } + export const defaultLayer = Layer.unwrap( + Effect.sync(() => + layer.pipe( + Layer.provide(Session.defaultLayer), + Layer.provide(Snapshot.defaultLayer), + Layer.provide(Storage.defaultLayer), + Layer.provide(Bus.layer), + ), + ), + ) - for (const part of item.parts) { - if (part.type === "step-finish" && part.snapshot) { - to = part.snapshot - } - } - } + const { runPromise } = makeRuntime(Service, defaultLayer) - if (from && to) return Snapshot.diffFull(from, to) - return [] + export const summarize = (input: { sessionID: SessionID; messageID: MessageID }) => + void runPromise((svc) => svc.summarize(input)).catch(() => {}) + + export const DiffInput = z.object({ + sessionID: SessionID.zod, + messageID: MessageID.zod.optional(), + }) + + export async function diff(input: z.infer) { + return runPromise((svc) => svc.diff(input)) + } + + export async function computeDiff(input: { messages: MessageV2.WithParts[] }) { + return runPromise((svc) => svc.computeDiff(input)) } }