Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/opencode/src/effect/run-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: L

return {
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
runPromiseExit: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
getRuntime().runPromiseExit(service.use(fn), options),
runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
getRuntime().runPromise(service.use(fn), options),
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
Expand Down
113 changes: 56 additions & 57 deletions packages/opencode/src/session/compaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import { Agent } from "@/agent/agent"
import { Plugin } from "@/plugin"
import { Config } from "@/config/config"
import { NotFoundError } from "@/storage/db"
import { ProviderTransform } from "@/provider/transform"
import { ModelID, ProviderID } from "@/provider/schema"
import { Effect, Layer, ServiceMap } from "effect"
import { Cause, Effect, Exit, Layer, ServiceMap } from "effect"
import { makeRuntime } from "@/effect/run-service"
import { isOverflow as overflow } from "./overflow"

export namespace SessionCompaction {
const log = Log.create({ service: "session.compaction" })
Expand All @@ -31,7 +31,6 @@ export namespace SessionCompaction {
),
}

const COMPACTION_BUFFER = 20_000
export const PRUNE_MINIMUM = 20_000
export const PRUNE_PROTECT = 40_000
const PRUNE_PROTECTED_TOOLS = ["skill"]
Expand Down Expand Up @@ -64,7 +63,7 @@ export namespace SessionCompaction {
export const layer: Layer.Layer<
Service,
never,
Bus.Service | Config.Service | Session.Service | Agent.Service | Plugin.Service
Bus.Service | Config.Service | Session.Service | Agent.Service | Plugin.Service | SessionProcessor.Service
> = Layer.effect(
Service,
Effect.gen(function* () {
Expand All @@ -73,26 +72,13 @@ export namespace SessionCompaction {
const session = yield* Session.Service
const agents = yield* Agent.Service
const plugin = yield* Plugin.Service
const processors = yield* SessionProcessor.Service

const isOverflow = Effect.fn("SessionCompaction.isOverflow")(function* (input: {
tokens: MessageV2.Assistant["tokens"]
model: Provider.Model
}) {
const cfg = yield* config.get()
if (cfg.compaction?.auto === false) return false
const context = input.model.limit.context
if (context === 0) return false

const count =
input.tokens.total ||
input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write

const reserved =
cfg.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model))
const usable = input.model.limit.input
? input.model.limit.input - reserved
: context - ProviderTransform.maxOutputTokens(input.model)
return count >= usable
return overflow({ cfg: yield* config.get(), tokens: input.tokens, model: input.model })
})

// goes backwards through parts until there are PRUNE_PROTECT tokens worth of tool
Expand Down Expand Up @@ -181,38 +167,6 @@ export namespace SessionCompaction {
? Provider.getModel(agent.model.providerID, agent.model.modelID)
: Provider.getModel(userMessage.model.providerID, userMessage.model.modelID),
)
const msg = (yield* session.updateMessage({
id: MessageID.ascending(),
role: "assistant",
parentID: input.parentID,
sessionID: input.sessionID,
mode: "compaction",
agent: "compaction",
variant: userMessage.variant,
summary: true,
path: {
cwd: Instance.directory,
root: Instance.worktree,
},
cost: 0,
tokens: {
output: 0,
input: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
modelID: model.id,
providerID: model.providerID,
time: {
created: Date.now(),
},
})) as MessageV2.Assistant
const processor = SessionProcessor.create({
assistantMessage: msg,
sessionID: input.sessionID,
model,
abort: input.abort,
})
// Allow plugins to inject context or replace compaction prompt.
const compacting = yield* plugin.trigger(
"experimental.session.compacting",
Expand Down Expand Up @@ -251,8 +205,47 @@ When constructing the summary, try to stick to this template:
const msgs = structuredClone(messages)
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
const result = yield* Effect.promise(() =>
processor.process({
const msg = (yield* session.updateMessage({
id: MessageID.ascending(),
role: "assistant",
parentID: input.parentID,
sessionID: input.sessionID,
mode: "compaction",
agent: "compaction",
variant: userMessage.variant,
summary: true,
path: {
cwd: Instance.directory,
root: Instance.worktree,
},
cost: 0,
tokens: {
output: 0,
input: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
modelID: model.id,
providerID: model.providerID,
time: {
created: Date.now(),
},
})) as MessageV2.Assistant
const processor = yield* processors.create({
assistantMessage: msg,
sessionID: input.sessionID,
model,
abort: input.abort,
})
const cancel = Effect.fn("SessionCompaction.cancel")(function* () {
if (!input.abort.aborted || msg.time.completed) return
msg.error = msg.error ?? new MessageV2.AbortedError({ message: "Aborted" }).toObject()
msg.finish = msg.finish ?? "error"
msg.time.completed = Date.now()
yield* session.updateMessage(msg)
})
const result = yield* processor
.process({
user: userMessage,
agent,
abort: input.abort,
Expand All @@ -267,8 +260,8 @@ When constructing the summary, try to stick to this template:
},
],
model,
}),
)
})
.pipe(Effect.ensuring(cancel()))

if (result === "compact") {
processor.message.error = new MessageV2.ContextOverflowError({
Expand Down Expand Up @@ -383,6 +376,7 @@ When constructing the summary, try to stick to this template:
Effect.sync(() =>
layer.pipe(
Layer.provide(Session.defaultLayer),
Layer.provide(SessionProcessor.defaultLayer),
Layer.provide(Agent.defaultLayer),
Layer.provide(Plugin.defaultLayer),
Layer.provide(Bus.layer),
Expand All @@ -391,7 +385,7 @@ When constructing the summary, try to stick to this template:
),
)

const { runPromise } = makeRuntime(Service, defaultLayer)
const { runPromise, runPromiseExit } = makeRuntime(Service, defaultLayer)

export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
return runPromise((svc) => svc.isOverflow(input))
Expand All @@ -409,7 +403,12 @@ When constructing the summary, try to stick to this template:
auto: boolean
overflow?: boolean
}) {
return runPromise((svc) => svc.process(input))
const exit = await runPromiseExit((svc) => svc.process(input), { signal: input.abort })
if (Exit.isFailure(exit)) {
if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) return "stop"
throw Cause.squash(exit.cause)
}
return exit.value
}

export const create = fn(
Expand Down
31 changes: 31 additions & 0 deletions packages/opencode/src/session/llm.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Provider } from "@/provider/provider"
import { Log } from "@/util/log"
import { Effect, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai"
import { mergeDeep, pipe } from "remeda"
import { GitLabWorkflowLanguageModel } from "gitlab-ai-provider"
Expand Down Expand Up @@ -34,6 +36,35 @@ export namespace LLM {
toolChoice?: "auto" | "required" | "none"
}

export type Event = Awaited<ReturnType<typeof stream>>["fullStream"] extends AsyncIterable<infer T> ? T : never

export interface Interface {
readonly stream: (input: StreamInput) => Stream.Stream<Event, unknown>
}

export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/LLM") {}

export const layer = Layer.effect(
Service,
Effect.gen(function* () {
return Service.of({
stream(input) {
return Stream.unwrap(
Effect.promise(() => LLM.stream(input)).pipe(
Effect.map((result) =>
Stream.fromAsyncIterable(result.fullStream, (err) => err).pipe(
Stream.mapEffect((event) => Effect.succeed(event)),
),
),
),
)
},
})
}),
)

export const defaultLayer = layer

export async function stream(input: StreamInput) {
const l = log
.clone()
Expand Down
22 changes: 22 additions & 0 deletions packages/opencode/src/session/overflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import type { Config } from "@/config/config"
import type { Provider } from "@/provider/provider"
import { ProviderTransform } from "@/provider/transform"
import type { MessageV2 } from "./message-v2"

const COMPACTION_BUFFER = 20_000

export function isOverflow(input: { cfg: Config.Info; tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
if (input.cfg.compaction?.auto === false) return false
const context = input.model.limit.context
if (context === 0) return false

const count =
input.tokens.total || input.tokens.input + input.tokens.output + input.tokens.cache.read + input.tokens.cache.write

const reserved =
input.cfg.compaction?.reserved ?? Math.min(COMPACTION_BUFFER, ProviderTransform.maxOutputTokens(input.model))
const usable = input.model.limit.input
? input.model.limit.input - reserved
: context - ProviderTransform.maxOutputTokens(input.model)
return count >= usable
}
Loading
Loading