diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index b05dd86259d9..5724f12a0c13 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -258,13 +258,20 @@ export namespace Plugin { } // Subscribe to bus events, fiber interrupted when scope closes + // Run plugin event handlers in parallel so slow handlers don't block the main event stream yield* bus.subscribeAll().pipe( Stream.runForEach((input) => - Effect.sync(() => { - for (const hook of hooks) { - hook["event"]?.({ event: input as any }) - } - }), + Effect.forEach( + hooks, + (hook) => + Effect.tryPromise({ + try: () => Promise.resolve(hook["event"]?.({ event: input as any })), + catch: (err) => { + log.error("plugin event handler failed", { error: err }) + }, + }).pipe(Effect.ignore), + { concurrency: "unbounded", discard: true }, + ), ), Effect.forkScoped, ) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index b632a61a18e8..0852f1a82426 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -46,12 +46,14 @@ export namespace SessionProcessor { interface ProcessorContext extends Input { toolcalls: Record + toolCallHistory: MessageV2.ToolPart[] shouldBreak: boolean snapshot: string | undefined blocked: boolean needsCompaction: boolean currentText: MessageV2.TextPart | undefined reasoningMap: Record + stepCount: number } type StreamEvent = Event @@ -89,12 +91,14 @@ export namespace SessionProcessor { sessionID: input.sessionID, model: input.model, toolcalls: {}, + toolCallHistory: [], shouldBreak: false, snapshot: undefined, blocked: false, needsCompaction: false, currentText: undefined, reasoningMap: {}, + stepCount: 0, } let aborted = false @@ -180,12 +184,13 @@ export namespace SessionProcessor { metadata: value.providerMetadata, } satisfies MessageV2.ToolPart) - const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id)) - const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD) + // Track tool call history in-memory for doom loop detection + ctx.toolCallHistory.push(ctx.toolcalls[value.toolCallId]) + const recentCalls = ctx.toolCallHistory.slice(-DOOM_LOOP_THRESHOLD) if ( - recentParts.length !== DOOM_LOOP_THRESHOLD || - !recentParts.every( + recentCalls.length !== DOOM_LOOP_THRESHOLD || + !recentCalls.every( (part) => part.type === "tool" && part.tool === value.toolName && @@ -294,12 +299,15 @@ export namespace SessionProcessor { } ctx.snapshot = undefined } - yield* Effect.promise(() => - SessionSummary.summarize({ - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.parentID, - }), - ).pipe(Effect.ignoreCause({ log: true, message: "session summary failed" }), Effect.forkDetach) + ctx.stepCount++ + if (ctx.stepCount === 1) { + yield* Effect.promise(() => + SessionSummary.summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }), + ).pipe(Effect.ignoreCause({ log: true, message: "session summary failed" }), Effect.forkDetach) + } if ( !ctx.assistantMessage.summary && isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index dbf815bd6d79..8d86aa312750 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -100,13 +100,21 @@ export namespace SessionPrompt { const cache = yield* InstanceState.make( Effect.fn("SessionPrompt.state")(function* () { const runners = new Map>() + const cachedMessages = new Map() + const cachedTools = new Map>() + const lastAgentName = new Map() + const lastModelKey = new Map() yield* Effect.addFinalizer( Effect.fnUntraced(function* () { yield* Effect.forEach(runners.values(), (r) => r.cancel, { concurrency: "unbounded", discard: true }) runners.clear() + cachedMessages.clear() + cachedTools.clear() + lastAgentName.clear() + lastModelKey.clear() }), ) - return { runners } + return { runners, cachedMessages, cachedTools, lastAgentName, lastModelKey } }), ) @@ -1333,12 +1341,17 @@ NOTE: At any point in time through this workflow you should feel free to ask the let structured: unknown | undefined let step = 0 const session = yield* sessions.get(sessionID) + const s = yield* InstanceState.get(cache) while (true) { yield* status.set(sessionID, { type: "busy" }) log.info("loop", { step, sessionID }) - let msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(sessionID))) + // Use cached messages on subsequent iterations to avoid DB re-read + let msgs = + step > 0 && s.cachedMessages.has(sessionID) + ? s.cachedMessages.get(sessionID)! + : yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(sessionID))) let lastUser: MessageV2.User | undefined let lastAssistant: MessageV2.Assistant | undefined @@ -1389,6 +1402,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the auto: task.auto, overflow: task.overflow, }) + // Invalidate caches after compaction since messages change + s.cachedMessages.delete(sessionID) + s.cachedTools.delete(sessionID) if (result === "stop") break continue } @@ -1399,6 +1415,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the (yield* compaction.isOverflow({ tokens: lastFinished.tokens, model })) ) { yield* compaction.create({ sessionID, agent: lastUser.agent, model: lastUser.model, auto: true }) + // Invalidate caches after compaction + s.cachedMessages.delete(sessionID) + s.cachedTools.delete(sessionID) continue } @@ -1441,15 +1460,23 @@ NOTE: At any point in time through this workflow you should feel free to ask the const lastUserMsg = msgs.findLast((m) => m.info.role === "user") const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false - const tools = yield* resolveTools({ - agent, - session, - model, - tools: lastUser.tools, - processor: handle, - bypassAgentCheck, - messages: msgs, - }) + // Use cached tools if agent and model haven't changed + const modelKey = `${model.providerID}/${model.id}` + const toolsCacheValid = + s.cachedTools.has(sessionID) && + s.lastAgentName.get(sessionID) === agent.name && + s.lastModelKey.get(sessionID) === modelKey + const tools: Record = toolsCacheValid + ? s.cachedTools.get(sessionID)! + : yield* resolveTools({ + agent, + session, + model, + tools: lastUser.tools, + processor: handle, + bypassAgentCheck, + messages: msgs, + }) if (lastUser.format?.type === "json_schema") { tools["StructuredOutput"] = createStructuredOutputTool({ @@ -1534,6 +1561,13 @@ NOTE: At any point in time through this workflow you should feel free to ask the overflow: !handle.message.finish, }) } + + // Cache messages and tools for next iteration to avoid DB re-read + s.cachedMessages.set(sessionID, msgs) + s.cachedTools.set(sessionID, tools) + s.lastAgentName.set(sessionID, agent.name) + s.lastModelKey.set(sessionID, `${model.providerID}/${model.id}`) + return "continue" as const }), Effect.fnUntraced(function* (exit) {