Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
5719a13
refactor(session): effectify SessionPrompt service
kitlangton Mar 28, 2026
76373f0
use Session.Service and Agent.Service directly instead of Effect.prom…
kitlangton Mar 28, 2026
be20ea0
use SessionProcessor.Service, Session.Service, Agent.Service directly…
kitlangton Mar 28, 2026
51ad480
add Effect-based prompt tests covering loop lifecycle, cancel, concur…
kitlangton Mar 28, 2026
2d9a310
use shorthand properties in Service.of, Fiber.interruptAll in finalizer
kitlangton Mar 28, 2026
f5bf694
use SessionCompaction.Service and Plugin.Service directly in layer
kitlangton Mar 28, 2026
4f78427
effectify resolvePromptParts: use AppFileSystem and Agent service dir…
kitlangton Mar 28, 2026
732bbde
effectify resolveCommand: use Command.Service, Agent, Plugin directly…
kitlangton Mar 28, 2026
8c10edc
remove dead resolveCommand and resolvePromptPartsImpl
kitlangton Mar 29, 2026
edc98d5
effectify ensureTitle: move into layer, use agents/sessions services,…
kitlangton Mar 29, 2026
06befef
simplify title: consolidate Effect.promise calls, use fiber signal, s…
kitlangton Mar 29, 2026
b41bb7e
extract getModel helper, simplify Promise.resolve wrapper
kitlangton Mar 29, 2026
7c9ccde
fix type errors after rebase: add casts for Session.updateMessage/upd…
kitlangton Mar 29, 2026
724e599
effectify createUserMessage: move into layer as createMessage, delete…
kitlangton Mar 29, 2026
eb05bda
remove abort from llm and processor services
kitlangton Mar 29, 2026
cdac7a7
remove abort from compaction service
kitlangton Mar 29, 2026
b23e5b7
make Session.updateMessage/updatePart generic, remove all MessageV2 c…
kitlangton Mar 29, 2026
41b92cb
restore prompt runtime after rebase drift
kitlangton Mar 29, 2026
9c2a06d
effectify prompt reminder injection
kitlangton Mar 29, 2026
eaf3454
effectify prompt tool resolution
kitlangton Mar 29, 2026
bbed140
effectify prompt subtask handling
kitlangton Mar 29, 2026
8b04ddc
effectify prompt shell execution
kitlangton Mar 29, 2026
ec8f7af
make llm service cancellation interrupt-safe
kitlangton Mar 30, 2026
1ff78fe
scope instruction prompt cleanup to message creation
kitlangton Mar 30, 2026
969e633
effectify prompt message part expansion
kitlangton Mar 30, 2026
bf5940d
tighten prompt cleanup assertions
kitlangton Mar 30, 2026
9e41d02
tighten prompt assistant message typing
kitlangton Mar 30, 2026
1849f09
tighten processor part typing
kitlangton Mar 30, 2026
c476365
fix session cleanup edge cases
kitlangton Mar 30, 2026
6389a0d
add single flight coordination primitive
kitlangton Mar 30, 2026
09544db
simplify single flight api
kitlangton Mar 30, 2026
1e2d6c2
simplify SingleFlight: 3 states, no onInterrupt, uninterruptible start
kitlangton Mar 30, 2026
0608d08
integrate SingleFlight into prompt coordination layer
kitlangton Mar 30, 2026
9bc52ad
catch Cancelled in loop to return last assistant on cancel
kitlangton Mar 30, 2026
18b31ea
use Schema.TaggedErrorClass for SingleFlight.Cancelled
kitlangton Mar 30, 2026
1fd1046
rename SingleFlight.join to SingleFlight.await
kitlangton Mar 30, 2026
86a9b57
inline makeFlight into loop, revert shell to forkChild
kitlangton Mar 30, 2026
dd26552
replace SingleFlight with per-session runner state machine
kitlangton Mar 30, 2026
8208783
add Runner: per-key actor with 4-state machine
kitlangton Mar 30, 2026
d267d74
extract Runner to standalone module, integrate into prompt.ts
kitlangton Mar 30, 2026
2d50cd9
remove old session-runner prototype test
kitlangton Mar 30, 2026
90c4c4a
refine Runner API: namespace pattern, _tag states, generic E type
kitlangton Mar 30, 2026
3d2e387
make Runner error type honest, remove casts
kitlangton Mar 30, 2026
bc7adfe
fix runner cancellation and session abort handling
kitlangton Mar 30, 2026
b0ef301
fix prompt follow-up regressions and cleanup
kitlangton Mar 30, 2026
9d74250
Merge origin/dev into worktree-effectify-session-prompt
kitlangton Mar 30, 2026
ba619e0
test(server): isolate session server state
kitlangton Mar 30, 2026
ee0b2e5
test(session): clarify active-run prompt coverage
kitlangton Mar 30, 2026
3a110c1
refactor(runner): simplify Match transition
kitlangton Mar 30, 2026
c629529
revert(runner): restore switch-based state transition
kitlangton Mar 30, 2026
ad7f149
refactor(runner): simplify ensureRunning transition
kitlangton Mar 30, 2026
776b9ae
refactor(runner): trim redundant state assertions
kitlangton Mar 30, 2026
c3aabbd
refactor(session): drop unused message update helpers
kitlangton Mar 30, 2026
c96d306
refactor(prompt): drop unused wrapper metadata
kitlangton Mar 30, 2026
14d088f
refactor(prompt): narrow Effect error types from unknown to never
kitlangton Mar 30, 2026
f04ca44
refactor(runner): absorb Cancelled internally, remove from public type
kitlangton Mar 30, 2026
827f5d0
refactor(runner): collapse duplicate cases, use fnUntraced consistently
kitlangton Mar 30, 2026
7c4fc70
refactor(prompt): use fnUntraced for Effect callback generators
kitlangton Mar 30, 2026
1497bd5
refactor(prompt): use ToolRegistry service directly instead of facade
kitlangton Mar 30, 2026
0f31215
refactor(tool): extract Tool.Def type, use Option.isNone in prompt
kitlangton Mar 30, 2026
286efbc
Merge branch 'dev' into worktree-effectify-session-prompt
kitlangton Mar 30, 2026
766a250
refactor(prompt): replace Filesystem util with effectified fsys service
kitlangton Mar 30, 2026
c0aae41
fix(test): skip shell-spawning prompt tests on Windows
kitlangton Mar 30, 2026
3d165a6
refactor(prompt): convert lastModelImpl to Effect, remove unused Fiber
kitlangton Mar 30, 2026
77c01aa
refactor(prompt): use Truncate service directly instead of facade
kitlangton Mar 30, 2026
84368fd
refactor(prompt): simplify finalizer forEach with discard
kitlangton Mar 30, 2026
0045a3e
fix(prompt): finalize subtask tool state on cancellation
kitlangton Mar 30, 2026
5d10a4b
refactor(prompt): simplify subtask cancellation to onInterrupt
kitlangton Mar 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions packages/opencode/src/effect/runner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import { Cause, Deferred, Effect, Exit, Fiber, Option, Schema, Scope, SynchronizedRef } from "effect"

export interface Runner<A, E = never> {
readonly state: Runner.State<A, E>
readonly busy: boolean
readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
readonly startShell: (work: (signal: AbortSignal) => Effect.Effect<A, E>) => Effect.Effect<A, E>
readonly cancel: Effect.Effect<void>
}

export namespace Runner {
export class Cancelled extends Schema.TaggedErrorClass<Cancelled>()("RunnerCancelled", {}) {}

interface RunHandle<A, E> {
id: number
done: Deferred.Deferred<A, E | Cancelled>
fiber: Fiber.Fiber<A, E>
}

interface ShellHandle<A, E> {
id: number
fiber: Fiber.Fiber<A, E>
abort: AbortController
}

interface PendingHandle<A, E> {
id: number
done: Deferred.Deferred<A, E | Cancelled>
work: Effect.Effect<A, E>
}

export type State<A, E> =
| { readonly _tag: "Idle" }
| { readonly _tag: "Running"; readonly run: RunHandle<A, E> }
| { readonly _tag: "Shell"; readonly shell: ShellHandle<A, E> }
| { readonly _tag: "ShellThenRun"; readonly shell: ShellHandle<A, E>; readonly run: PendingHandle<A, E> }

export const make = <A, E = never>(
scope: Scope.Scope,
opts?: {
onIdle?: Effect.Effect<void>
onBusy?: Effect.Effect<void>
onInterrupt?: Effect.Effect<A, E>
busy?: () => never
},
): Runner<A, E> => {
const ref = SynchronizedRef.makeUnsafe<State<A, E>>({ _tag: "Idle" })
const idle = opts?.onIdle ?? Effect.void
const busy = opts?.onBusy ?? Effect.void
const onInterrupt = opts?.onInterrupt
let ids = 0

const state = () => SynchronizedRef.getUnsafe(ref)
const next = () => {
ids += 1
return ids
}

const complete = (done: Deferred.Deferred<A, E | Cancelled>, exit: Exit.Exit<A, E>) =>
Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)
? Deferred.fail(done, new Cancelled()).pipe(Effect.asVoid)
: Deferred.done(done, exit).pipe(Effect.asVoid)

const idleIfCurrent = () =>
SynchronizedRef.modify(ref, (st) => [st._tag === "Idle" ? idle : Effect.void, st] as const).pipe(Effect.flatten)

const finishRun = (id: number, done: Deferred.Deferred<A, E | Cancelled>, exit: Exit.Exit<A, E>) =>
SynchronizedRef.modify(
ref,
(st) =>
[
Effect.gen(function* () {
if (st._tag === "Running" && st.run.id === id) yield* idle
yield* complete(done, exit)
}),
st._tag === "Running" && st.run.id === id ? ({ _tag: "Idle" } as const) : st,
] as const,
).pipe(Effect.flatten)

const startRun = (work: Effect.Effect<A, E>, done: Deferred.Deferred<A, E | Cancelled>) =>
Effect.gen(function* () {
const id = next()
const fiber = yield* work.pipe(
Effect.onExit((exit) => finishRun(id, done, exit)),
Effect.forkIn(scope),
)
return { id, done, fiber } satisfies RunHandle<A, E>
})

const finishShell = (id: number) =>
SynchronizedRef.modifyEffect(
ref,
Effect.fnUntraced(function* (st) {
if (st._tag === "Shell" && st.shell.id === id) return [idle, { _tag: "Idle" }] as const
if (st._tag === "ShellThenRun" && st.shell.id === id) {
const run = yield* startRun(st.run.work, st.run.done)
return [Effect.void, { _tag: "Running", run }] as const
}
return [Effect.void, st] as const
}),
).pipe(Effect.flatten)

const stopShell = (shell: ShellHandle<A, E>) =>
Effect.gen(function* () {
shell.abort.abort()
const exit = yield* Fiber.await(shell.fiber).pipe(Effect.timeoutOption("100 millis"))
if (Option.isNone(exit)) yield* Fiber.interrupt(shell.fiber)
yield* Fiber.await(shell.fiber).pipe(Effect.exit, Effect.asVoid)
})

const ensureRunning = (work: Effect.Effect<A, E>) =>
SynchronizedRef.modifyEffect(
ref,
Effect.fnUntraced(function* (st) {
switch (st._tag) {
case "Running":
case "ShellThenRun":
return [Deferred.await(st.run.done), st] as const
case "Shell": {
const run = {
id: next(),
done: yield* Deferred.make<A, E | Cancelled>(),
work,
} satisfies PendingHandle<A, E>
return [Deferred.await(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const
}
case "Idle": {
const done = yield* Deferred.make<A, E | Cancelled>()
const run = yield* startRun(work, done)
return [Deferred.await(done), { _tag: "Running", run }] as const
}
}
}),
).pipe(
Effect.flatten,
Effect.catch((e): Effect.Effect<A, E> =>
e instanceof Cancelled ? (onInterrupt ?? Effect.die(e)) : Effect.fail(e as E),
),
)

const startShell = (work: (signal: AbortSignal) => Effect.Effect<A, E>) =>
SynchronizedRef.modifyEffect(
ref,
Effect.fnUntraced(function* (st) {
if (st._tag !== "Idle") {
return [
Effect.sync(() => {
if (opts?.busy) opts.busy()
throw new Error("Runner is busy")
}),
st,
] as const
}
yield* busy
const id = next()
const abort = new AbortController()
const fiber = yield* work(abort.signal).pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
const shell = { id, fiber, abort } satisfies ShellHandle<A, E>
return [
Effect.gen(function* () {
const exit = yield* Fiber.await(fiber)
if (Exit.isSuccess(exit)) return exit.value
if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt
return yield* Effect.failCause(exit.cause)
}),
{ _tag: "Shell", shell },
] as const
}),
).pipe(Effect.flatten)

const cancel = SynchronizedRef.modify(ref, (st) => {
switch (st._tag) {
case "Idle":
return [Effect.void, st] as const
case "Running":
return [
Effect.gen(function* () {
yield* Fiber.interrupt(st.run.fiber)
yield* Deferred.await(st.run.done).pipe(Effect.exit, Effect.asVoid)
yield* idleIfCurrent()
}),
{ _tag: "Idle" } as const,
] as const
case "Shell":
return [
Effect.gen(function* () {
yield* stopShell(st.shell)
yield* idleIfCurrent()
}),
{ _tag: "Idle" } as const,
] as const
case "ShellThenRun":
return [
Effect.gen(function* () {
yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
yield* stopShell(st.shell)
yield* idleIfCurrent()
}),
{ _tag: "Idle" } as const,
] as const
}
}).pipe(Effect.flatten)

return {
get state() {
return state()
},
get busy() {
return state()._tag !== "Idle"
},
ensureRunning,
startShell,
cancel,
}
}
}
4 changes: 2 additions & 2 deletions packages/opencode/src/server/routes/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ export const SessionRoutes = lazy(() =>
}),
),
async (c) => {
SessionPrompt.cancel(c.req.valid("param").sessionID)
await SessionPrompt.cancel(c.req.valid("param").sessionID)
return c.json(true)
},
)
Expand Down Expand Up @@ -699,7 +699,7 @@ export const SessionRoutes = lazy(() =>
),
async (c) => {
const params = c.req.valid("param")
SessionPrompt.assertNotBusy(params.sessionID)
await SessionPrompt.assertNotBusy(params.sessionID)
await Session.removeMessage({
sessionID: params.sessionID,
messageID: params.messageID,
Expand Down
64 changes: 29 additions & 35 deletions packages/opencode/src/session/compaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { Plugin } from "@/plugin"
import { Config } from "@/config/config"
import { NotFoundError } from "@/storage/db"
import { ModelID, ProviderID } from "@/provider/schema"
import { Cause, Effect, Exit, Layer, ServiceMap } from "effect"
import { Effect, Layer, ServiceMap } from "effect"
import { makeRuntime } from "@/effect/run-service"
import { isOverflow as overflow } from "./overflow"

Expand Down Expand Up @@ -45,7 +45,6 @@ export namespace SessionCompaction {
parentID: MessageID
messages: MessageV2.WithParts[]
sessionID: SessionID
abort: AbortSignal
auto: boolean
overflow?: boolean
}) => Effect.Effect<"continue" | "stop">
Expand Down Expand Up @@ -135,20 +134,28 @@ export namespace SessionCompaction {
parentID: MessageID
messages: MessageV2.WithParts[]
sessionID: SessionID
abort: AbortSignal
auto: boolean
overflow?: boolean
}) {
const userMessage = input.messages.findLast((m) => m.info.id === input.parentID)!.info as MessageV2.User
const parent = input.messages.findLast((m) => m.info.id === input.parentID)
if (!parent || parent.info.role !== "user") {
throw new Error(`Compaction parent must be a user message: ${input.parentID}`)
}
const userMessage = parent.info

let messages = input.messages
let replay: MessageV2.WithParts | undefined
let replay:
| {
info: MessageV2.User
parts: MessageV2.Part[]
}
| undefined
if (input.overflow) {
const idx = input.messages.findIndex((m) => m.info.id === input.parentID)
for (let i = idx - 1; i >= 0; i--) {
const msg = input.messages[i]
if (msg.info.role === "user" && !msg.parts.some((p) => p.type === "compaction")) {
replay = msg
replay = { info: msg.info, parts: msg.parts }
messages = input.messages.slice(0, i)
break
}
Expand Down Expand Up @@ -206,7 +213,7 @@ When constructing the summary, try to stick to this template:
const msgs = structuredClone(messages)
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
const msg = (yield* session.updateMessage({
const msg: MessageV2.Assistant = {
id: MessageID.ascending(),
role: "assistant",
parentID: input.parentID,
Expand All @@ -231,25 +238,17 @@ When constructing the summary, try to stick to this template:
time: {
created: Date.now(),
},
})) as MessageV2.Assistant
}
yield* session.updateMessage(msg)
const processor = yield* processors.create({
assistantMessage: msg,
sessionID: input.sessionID,
model,
abort: input.abort,
})
const cancel = Effect.fn("SessionCompaction.cancel")(function* () {
if (!input.abort.aborted || msg.time.completed) return
msg.error = msg.error ?? new MessageV2.AbortedError({ message: "Aborted" }).toObject()
msg.finish = msg.finish ?? "error"
msg.time.completed = Date.now()
yield* session.updateMessage(msg)
})
const result = yield* processor
.process({
user: userMessage,
agent,
abort: input.abort,
sessionID: input.sessionID,
tools: {},
system: [],
Expand All @@ -262,7 +261,7 @@ When constructing the summary, try to stick to this template:
],
model,
})
.pipe(Effect.ensuring(cancel()))
.pipe(Effect.onInterrupt(() => processor.abort()))

if (result === "compact") {
processor.message.error = new MessageV2.ContextOverflowError({
Expand All @@ -277,7 +276,7 @@ When constructing the summary, try to stick to this template:

if (result === "continue" && input.auto) {
if (replay) {
const original = replay.info as MessageV2.User
const original = replay.info
const replayMsg = yield* session.updateMessage({
id: MessageID.ascending(),
role: "user",
Expand Down Expand Up @@ -386,7 +385,7 @@ When constructing the summary, try to stick to this template:
),
)

const { runPromise, runPromiseExit } = makeRuntime(Service, defaultLayer)
const { runPromise } = makeRuntime(Service, defaultLayer)

export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
return runPromise((svc) => svc.isOverflow(input))
Expand All @@ -396,21 +395,16 @@ When constructing the summary, try to stick to this template:
return runPromise((svc) => svc.prune(input))
}

export async function process(input: {
parentID: MessageID
messages: MessageV2.WithParts[]
sessionID: SessionID
abort: AbortSignal
auto: boolean
overflow?: boolean
}) {
const exit = await runPromiseExit((svc) => svc.process(input), { signal: input.abort })
if (Exit.isFailure(exit)) {
if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) return "stop"
throw Cause.squash(exit.cause)
}
return exit.value
}
export const process = fn(
z.object({
parentID: MessageID.zod,
messages: z.custom<MessageV2.WithParts[]>(),
sessionID: SessionID.zod,
auto: z.boolean(),
overflow: z.boolean().optional(),
}),
(input) => runPromise((svc) => svc.process(input)),
)

export const create = fn(
z.object({
Expand Down
Loading
Loading