diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index db6327c82e30..2a841920d9e0 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -90,8 +90,9 @@ export namespace Bus { if (ps) yield* PubSub.publish(ps, payload) yield* PubSub.publish(state.wildcard, payload) + const dir = yield* InstanceState.directory GlobalBus.emit("event", { - directory: Instance.directory, + directory: dir, payload, }) }) diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts index 9e56c980fbeb..f86d8d32af60 100644 --- a/packages/opencode/src/config/config.ts +++ b/packages/opencode/src/config/config.ts @@ -1486,7 +1486,8 @@ export namespace Config { }) const update = Effect.fn("Config.update")(function* (config: Info) { - const file = path.join(Instance.directory, "config.json") + const dir = yield* InstanceState.directory + const file = path.join(dir, "config.json") const existing = yield* loadFile(file) yield* fs.writeFileString(file, JSON.stringify(mergeDeep(existing, config), null, 2)).pipe(Effect.orDie) yield* Effect.promise(() => Instance.dispose()) diff --git a/packages/opencode/src/effect/instance-ref.ts b/packages/opencode/src/effect/instance-ref.ts new file mode 100644 index 000000000000..d3939b2640ad --- /dev/null +++ b/packages/opencode/src/effect/instance-ref.ts @@ -0,0 +1,6 @@ +import { ServiceMap } from "effect" +import type { InstanceContext } from "@/project/instance" + +export const InstanceRef = ServiceMap.Reference("~opencode/InstanceRef", { + defaultValue: () => undefined, +}) diff --git a/packages/opencode/src/effect/instance-state.ts b/packages/opencode/src/effect/instance-state.ts index 6873ec255c96..b073cf0a4b53 100644 --- a/packages/opencode/src/effect/instance-state.ts +++ b/packages/opencode/src/effect/instance-state.ts @@ -1,5 +1,7 @@ -import { Effect, ScopedCache, Scope } from "effect" +import { Effect, Fiber, ScopedCache, Scope, ServiceMap } from "effect" import { Instance, type InstanceContext } from "@/project/instance" +import { Context } from "@/util/context" +import { InstanceRef } from "./instance-ref" import { registerDisposer } from "./instance-registry" const TypeId = "~opencode/InstanceState" @@ -10,13 +12,34 @@ export interface InstanceState { } export namespace InstanceState { + export const bind = any>(fn: F): F => { + try { + return Instance.bind(fn) + } catch (err) { + if (!(err instanceof Context.NotFound)) throw err + } + const fiber = Fiber.getCurrent() + const ctx = fiber ? ServiceMap.getReferenceUnsafe(fiber.services, InstanceRef) : undefined + if (!ctx) return fn + return ((...args: any[]) => Instance.restore(ctx, () => fn(...args))) as F + } + + export const context = Effect.fnUntraced(function* () { + return (yield* InstanceRef) ?? Instance.current + })() + + export const directory = Effect.map(context, (ctx) => ctx.directory) + export const make = ( init: (ctx: InstanceContext) => Effect.Effect, ): Effect.Effect>, never, R | Scope.Scope> => Effect.gen(function* () { const cache = yield* ScopedCache.make({ capacity: Number.POSITIVE_INFINITY, - lookup: () => init(Instance.current), + lookup: () => + Effect.fnUntraced(function* () { + return yield* init(yield* context) + })(), }) const off = registerDisposer((directory) => Effect.runPromise(ScopedCache.invalidate(cache, directory))) @@ -29,7 +52,9 @@ export namespace InstanceState { }) export const get = (self: InstanceState) => - Effect.suspend(() => ScopedCache.get(self.cache, Instance.directory)) + Effect.gen(function* () { + return yield* ScopedCache.get(self.cache, yield* directory) + }) export const use = (self: InstanceState, select: (value: A) => B) => Effect.map(get(self), select) @@ -40,8 +65,18 @@ export namespace InstanceState { ) => Effect.flatMap(get(self), select) export const has = (self: InstanceState) => - Effect.suspend(() => ScopedCache.has(self.cache, Instance.directory)) + Effect.gen(function* () { + return yield* ScopedCache.has(self.cache, yield* directory) + }) export const invalidate = (self: InstanceState) => - Effect.suspend(() => ScopedCache.invalidate(self.cache, Instance.directory)) + Effect.gen(function* () { + return yield* ScopedCache.invalidate(self.cache, yield* directory) + }) + + /** + * Effect finalizers run on the fiber scheduler after the original async + * boundary, so ALS reads like Instance.directory can be gone by then. + */ + export const withALS = (fn: () => T) => Effect.map(context, (ctx) => Instance.restore(ctx, fn)) } diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts index 2daa29fde8f6..619d5be6b5d7 100644 --- a/packages/opencode/src/effect/run-service.ts +++ b/packages/opencode/src/effect/run-service.ts @@ -1,19 +1,33 @@ import { Effect, Layer, ManagedRuntime } from "effect" import * as ServiceMap from "effect/ServiceMap" +import { Instance } from "@/project/instance" +import { Context } from "@/util/context" +import { InstanceRef } from "./instance-ref" export const memoMap = Layer.makeMemoMapUnsafe() +function attach(effect: Effect.Effect): Effect.Effect { + try { + const ctx = Instance.current + return Effect.provideService(effect, InstanceRef, ctx) + } catch (err) { + if (!(err instanceof Context.NotFound)) throw err + } + return effect +} + export function makeRuntime(service: ServiceMap.Service, layer: Layer.Layer) { let rt: ManagedRuntime.ManagedRuntime | undefined const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap })) return { - runSync: (fn: (svc: S) => Effect.Effect) => getRuntime().runSync(service.use(fn)), + runSync: (fn: (svc: S) => Effect.Effect) => getRuntime().runSync(attach(service.use(fn))), runPromiseExit: (fn: (svc: S) => Effect.Effect, options?: Effect.RunOptions) => - getRuntime().runPromiseExit(service.use(fn), options), + getRuntime().runPromiseExit(attach(service.use(fn)), options), runPromise: (fn: (svc: S) => Effect.Effect, options?: Effect.RunOptions) => - getRuntime().runPromise(service.use(fn), options), - runFork: (fn: (svc: S) => Effect.Effect) => getRuntime().runFork(service.use(fn)), - runCallback: (fn: (svc: S) => Effect.Effect) => getRuntime().runCallback(service.use(fn)), + getRuntime().runPromise(attach(service.use(fn)), options), + runFork: (fn: (svc: S) => Effect.Effect) => getRuntime().runFork(attach(service.use(fn))), + runCallback: (fn: (svc: S) => Effect.Effect) => + getRuntime().runCallback(attach(service.use(fn))), } } diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts index 8def2487578e..795364be1c57 100644 --- a/packages/opencode/src/format/index.ts +++ b/packages/opencode/src/format/index.ts @@ -108,10 +108,11 @@ export namespace Format { for (const item of yield* Effect.promise(() => getFormatter(ext))) { log.info("running", { command: item.command }) const cmd = item.command.map((x) => x.replace("$FILE", filepath)) + const dir = yield* InstanceState.directory const code = yield* spawner .spawn( ChildProcess.make(cmd[0]!, cmd.slice(1), { - cwd: Instance.directory, + cwd: dir, env: item.environment, extendEnv: true, }), diff --git a/packages/opencode/src/installation/index.ts b/packages/opencode/src/installation/index.ts index 52c149c4fd9d..232fa14f542c 100644 --- a/packages/opencode/src/installation/index.ts +++ b/packages/opencode/src/installation/index.ts @@ -9,11 +9,7 @@ import z from "zod" import { BusEvent } from "@/bus/bus-event" import { Flag } from "../flag/flag" import { Log } from "../util/log" - -declare global { - const OPENCODE_VERSION: string - const OPENCODE_CHANNEL: string -} +import { CHANNEL as channel, VERSION as version } from "./meta" import semver from "semver" @@ -60,8 +56,8 @@ export namespace Installation { }) export type Info = z.infer - export const VERSION = typeof OPENCODE_VERSION === "string" ? OPENCODE_VERSION : "local" - export const CHANNEL = typeof OPENCODE_CHANNEL === "string" ? OPENCODE_CHANNEL : "local" + export const VERSION = version + export const CHANNEL = channel export const USER_AGENT = `opencode/${CHANNEL}/${VERSION}/${Flag.OPENCODE_CLIENT}` export function isPreview() { diff --git a/packages/opencode/src/installation/meta.ts b/packages/opencode/src/installation/meta.ts new file mode 100644 index 000000000000..6a1315db27cf --- /dev/null +++ b/packages/opencode/src/installation/meta.ts @@ -0,0 +1,7 @@ +declare global { + const OPENCODE_VERSION: string + const OPENCODE_CHANNEL: string +} + +export const VERSION = typeof OPENCODE_VERSION === "string" ? OPENCODE_VERSION : "local" +export const CHANNEL = typeof OPENCODE_CHANNEL === "string" ? OPENCODE_CHANNEL : "local" diff --git a/packages/opencode/src/project/instance.ts b/packages/opencode/src/project/instance.ts index 5dddfe627fbc..a0d6f2414a85 100644 --- a/packages/opencode/src/project/instance.ts +++ b/packages/opencode/src/project/instance.ts @@ -114,6 +114,14 @@ export const Instance = { const ctx = context.use() return ((...args: any[]) => context.provide(ctx, () => fn(...args))) as F }, + /** + * Run a synchronous function within the given instance context ALS. + * Use this to bridge from Effect (where InstanceRef carries context) + * back to sync code that reads Instance.directory from ALS. + */ + restore(ctx: InstanceContext, fn: () => R): R { + return context.provide(ctx, fn) + }, state(init: () => S, dispose?: (state: Awaited) => Promise): () => S { return State.create(() => Instance.directory, init, dispose) }, diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 229dff0c46de..02a8d9484514 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -17,6 +17,7 @@ import { NotFoundError } from "@/storage/db" import { ModelID, ProviderID } from "@/provider/schema" import { Effect, Layer, ServiceMap } from "effect" import { makeRuntime } from "@/effect/run-service" +import { InstanceState } from "@/effect/instance-state" import { isOverflow as overflow } from "./overflow" export namespace SessionCompaction { @@ -213,6 +214,7 @@ When constructing the summary, try to stick to this template: const msgs = structuredClone(messages) yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs }) const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true })) + const ctx = yield* InstanceState.context const msg: MessageV2.Assistant = { id: MessageID.ascending(), role: "assistant", @@ -223,8 +225,8 @@ When constructing the summary, try to stick to this template: variant: userMessage.variant, summary: true, path: { - cwd: Instance.directory, - root: Instance.worktree, + cwd: ctx.directory, + root: ctx.worktree, }, cost: 0, tokens: { diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 94aee14c09c4..5ed5acafaf54 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -19,6 +19,7 @@ import { Log } from "../util/log" import { updateSchema } from "../util/update-schema" import { MessageV2 } from "./message-v2" import { Instance } from "../project/instance" +import { InstanceState } from "@/effect/instance-state" import { SessionPrompt } from "./prompt" import { fn } from "@/util/fn" import { Command } from "../command" @@ -382,11 +383,12 @@ export namespace Session { directory: string permission?: Permission.Ruleset }) { + const ctx = yield* InstanceState.context const result: Info = { id: SessionID.descending(input.id), slug: Slug.create(), version: Installation.VERSION, - projectID: Instance.project.id, + projectID: ctx.project.id, directory: input.directory, workspaceID: input.workspaceID, parentID: input.parentID, @@ -444,12 +446,12 @@ export namespace Session { }) const children = Effect.fn("Session.children")(function* (parentID: SessionID) { - const project = Instance.project + const ctx = yield* InstanceState.context const rows = yield* db((d) => d .select() .from(SessionTable) - .where(and(eq(SessionTable.project_id, project.id), eq(SessionTable.parent_id, parentID))) + .where(and(eq(SessionTable.project_id, ctx.project.id), eq(SessionTable.parent_id, parentID))) .all(), ) return rows.map(fromRow) @@ -496,9 +498,10 @@ export namespace Session { permission?: Permission.Ruleset workspaceID?: WorkspaceID }) { + const directory = yield* InstanceState.directory return yield* createNext({ parentID: input?.parentID, - directory: Instance.directory, + directory, title: input?.title, permission: input?.permission, workspaceID: input?.workspaceID, @@ -506,10 +509,11 @@ export namespace Session { }) const fork = Effect.fn("Session.fork")(function* (input: { sessionID: SessionID; messageID?: MessageID }) { + const directory = yield* InstanceState.directory const original = yield* get(input.sessionID) const title = getForkedTitle(original.title) const session = yield* createNext({ - directory: Instance.directory, + directory, workspaceID: original.workspaceID, title, }) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index dbf815bd6d79..083c23cc68f8 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -148,6 +148,7 @@ export namespace SessionPrompt { }) const resolvePromptParts = Effect.fn("SessionPrompt.resolvePromptParts")(function* (template: string) { + const ctx = yield* InstanceState.context const parts: PromptInput["parts"] = [{ type: "text", text: template }] const files = ConfigMarkdown.files(template) const seen = new Set() @@ -159,7 +160,7 @@ export namespace SessionPrompt { seen.add(name) const filepath = name.startsWith("~/") ? path.join(os.homedir(), name.slice(2)) - : path.resolve(Instance.worktree, name) + : path.resolve(ctx.worktree, name) const info = yield* fsys.stat(filepath).pipe(Effect.option) if (Option.isNone(info)) { @@ -553,6 +554,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the msgs: MessageV2.WithParts[] }) { const { task, model, lastUser, sessionID, session, msgs } = input + const ctx = yield* InstanceState.context const taskTool = yield* Effect.promise(() => TaskTool.init()) const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({ @@ -563,7 +565,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the mode: task.agent, agent: task.agent, variant: lastUser.variant, - path: { cwd: Instance.directory, root: Instance.worktree }, + path: { cwd: ctx.directory, root: ctx.worktree }, cost: 0, tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, modelID: taskModel.id, @@ -734,6 +736,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the }) const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, signal: AbortSignal) { + const ctx = yield* InstanceState.context const session = yield* sessions.get(input.sessionID) if (session.revert) { yield* Effect.promise(() => SessionRevert.cleanup(session)) @@ -773,7 +776,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the mode: input.agent, agent: input.agent, cost: 0, - path: { cwd: Instance.directory, root: Instance.worktree }, + path: { cwd: ctx.directory, root: ctx.worktree }, time: { created: Date.now() }, role: "assistant", tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, @@ -832,7 +835,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the } const args = (invocations[shellName] ?? invocations[""]).args - const cwd = Instance.directory + const cwd = ctx.directory const shellEnv = yield* plugin.trigger( "shell.env", { cwd, sessionID: input.sessionID, callID: part.callID }, @@ -976,7 +979,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the variant, } - yield* Effect.addFinalizer(() => Effect.sync(() => InstructionPrompt.clear(info.id))) + yield* Effect.addFinalizer(() => InstanceState.withALS(() => InstructionPrompt.clear(info.id))) type Draft = T extends MessageV2.Part ? Omit & { id?: string } : never const assign = (part: Draft): MessageV2.Part => ({ @@ -1330,6 +1333,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the const runLoop: (sessionID: SessionID) => Effect.Effect = Effect.fn("SessionPrompt.run")( function* (sessionID: SessionID) { + const ctx = yield* InstanceState.context let structured: unknown | undefined let step = 0 const session = yield* sessions.get(sessionID) @@ -1421,7 +1425,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the mode: agent.name, agent: agent.name, variant: lastUser.variant, - path: { cwd: Instance.directory, root: Instance.worktree }, + path: { cwd: ctx.directory, root: ctx.worktree }, cost: 0, tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, modelID: model.id, @@ -1538,7 +1542,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the }), Effect.fnUntraced(function* (exit) { if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort() - InstructionPrompt.clear(handle.message.id) + yield* InstanceState.withALS(() => InstructionPrompt.clear(handle.message.id)) }), ) if (outcome === "break") break diff --git a/packages/opencode/src/storage/db.ts b/packages/opencode/src/storage/db.ts index f41a1ecd8549..4cb0dbc3e184 100644 --- a/packages/opencode/src/storage/db.ts +++ b/packages/opencode/src/storage/db.ts @@ -10,8 +10,9 @@ import { NamedError } from "@opencode-ai/util/error" import z from "zod" import path from "path" import { readFileSync, readdirSync, existsSync } from "fs" -import { Installation } from "../installation" import { Flag } from "../flag/flag" +import { CHANNEL } from "../installation/meta" +import { InstanceState } from "@/effect/instance-state" import { iife } from "@/util/iife" import { init } from "#db" @@ -28,10 +29,9 @@ const log = Log.create({ service: "db" }) export namespace Database { export function getChannelPath() { - const channel = Installation.CHANNEL - if (["latest", "beta"].includes(channel) || Flag.OPENCODE_DISABLE_CHANNEL_DB) + if (["latest", "beta"].includes(CHANNEL) || Flag.OPENCODE_DISABLE_CHANNEL_DB) return path.join(Global.Path.data, "opencode.db") - const safe = channel.replace(/[^a-zA-Z0-9._-]/g, "-") + const safe = CHANNEL.replace(/[^a-zA-Z0-9._-]/g, "-") return path.join(Global.Path.data, `opencode-${safe}.db`) } @@ -142,10 +142,11 @@ export namespace Database { } export function effect(fn: () => any | Promise) { + const bound = InstanceState.bind(fn) try { - ctx.use().effects.push(fn) + ctx.use().effects.push(bound) } catch { - fn() + bound() } } @@ -162,12 +163,8 @@ export namespace Database { } catch (err) { if (err instanceof Context.NotFound) { const effects: (() => void | Promise)[] = [] - const result = Client().transaction( - (tx: TxOrDb) => { - return ctx.provide({ tx, effects }, () => callback(tx)) - }, - { behavior: options?.behavior }, - ) + const txCallback = InstanceState.bind((tx: TxOrDb) => ctx.provide({ tx, effects }, () => callback(tx))) + const result = Client().transaction(txCallback, { behavior: options?.behavior }) for (const effect of effects) effect() return result as NotPromise } diff --git a/packages/opencode/src/worktree/index.ts b/packages/opencode/src/worktree/index.ts index 7087ac262793..da20a5d6db1e 100644 --- a/packages/opencode/src/worktree/index.ts +++ b/packages/opencode/src/worktree/index.ts @@ -18,6 +18,7 @@ import { NodePath } from "@effect/platform-node" import { AppFileSystem } from "@/filesystem" import { makeRuntime } from "@/effect/run-service" import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner" +import { InstanceState } from "@/effect/instance-state" export namespace Worktree { const log = Log.create({ service: "worktree" }) @@ -199,6 +200,7 @@ export namespace Worktree { const MAX_NAME_ATTEMPTS = 26 const candidate = Effect.fn("Worktree.candidate")(function* (root: string, base?: string) { + const ctx = yield* InstanceState.context for (const attempt of Array.from({ length: MAX_NAME_ATTEMPTS }, (_, i) => i)) { const name = base ? (attempt === 0 ? base : `${base}-${Slug.create()}`) : Slug.create() const branch = `opencode/${name}` @@ -207,7 +209,7 @@ export namespace Worktree { if (yield* fs.exists(directory).pipe(Effect.orDie)) continue const ref = `refs/heads/${branch}` - const branchCheck = yield* git(["show-ref", "--verify", "--quiet", ref], { cwd: Instance.worktree }) + const branchCheck = yield* git(["show-ref", "--verify", "--quiet", ref], { cwd: ctx.worktree }) if (branchCheck.code === 0) continue return Info.parse({ name, branch, directory }) @@ -216,11 +218,12 @@ export namespace Worktree { }) const makeWorktreeInfo = Effect.fn("Worktree.makeWorktreeInfo")(function* (name?: string) { - if (Instance.project.vcs !== "git") { + const ctx = yield* InstanceState.context + if (ctx.project.vcs !== "git") { throw new NotGitError({ message: "Worktrees are only supported for git projects" }) } - const root = pathSvc.join(Global.Path.data, "worktree", Instance.project.id) + const root = pathSvc.join(Global.Path.data, "worktree", ctx.project.id) yield* fs.makeDirectory(root, { recursive: true }).pipe(Effect.orDie) const base = name ? slugify(name) : "" @@ -228,18 +231,20 @@ export namespace Worktree { }) const setup = Effect.fnUntraced(function* (info: Info) { + const ctx = yield* InstanceState.context const created = yield* git(["worktree", "add", "--no-checkout", "-b", info.branch, info.directory], { - cwd: Instance.worktree, + cwd: ctx.worktree, }) if (created.code !== 0) { throw new CreateFailedError({ message: created.stderr || created.text || "Failed to create git worktree" }) } - yield* project.addSandbox(Instance.project.id, info.directory).pipe(Effect.catch(() => Effect.void)) + yield* project.addSandbox(ctx.project.id, info.directory).pipe(Effect.catch(() => Effect.void)) }) const boot = Effect.fnUntraced(function* (info: Info, startCommand?: string) { - const projectID = Instance.project.id + const ctx = yield* InstanceState.context + const projectID = ctx.project.id const extra = startCommand?.trim() const populated = yield* git(["reset", "--hard"], { cwd: info.directory }) diff --git a/packages/opencode/test/account/repo.test.ts b/packages/opencode/test/account/repo.test.ts index fb12ddf701f2..460c47443f3b 100644 --- a/packages/opencode/test/account/repo.test.ts +++ b/packages/opencode/test/account/repo.test.ts @@ -16,21 +16,21 @@ const truncate = Layer.effectDiscard( const it = testEffect(Layer.merge(AccountRepo.layer, truncate)) -it.effect("list returns empty when no accounts exist", () => +it.live("list returns empty when no accounts exist", () => Effect.gen(function* () { const accounts = yield* AccountRepo.use((r) => r.list()) expect(accounts).toEqual([]) }), ) -it.effect("active returns none when no accounts exist", () => +it.live("active returns none when no accounts exist", () => Effect.gen(function* () { const active = yield* AccountRepo.use((r) => r.active()) expect(Option.isNone(active)).toBe(true) }), ) -it.effect("persistAccount inserts and getRow retrieves", () => +it.live("persistAccount inserts and getRow retrieves", () => Effect.gen(function* () { const id = AccountID.make("user-1") yield* AccountRepo.use((r) => @@ -56,7 +56,7 @@ it.effect("persistAccount inserts and getRow retrieves", () => }), ) -it.effect("persistAccount sets the active account and org", () => +it.live("persistAccount sets the active account and org", () => Effect.gen(function* () { const id1 = AccountID.make("user-1") const id2 = AccountID.make("user-2") @@ -93,7 +93,7 @@ it.effect("persistAccount sets the active account and org", () => }), ) -it.effect("list returns all accounts", () => +it.live("list returns all accounts", () => Effect.gen(function* () { const id1 = AccountID.make("user-1") const id2 = AccountID.make("user-2") @@ -128,7 +128,7 @@ it.effect("list returns all accounts", () => }), ) -it.effect("remove deletes an account", () => +it.live("remove deletes an account", () => Effect.gen(function* () { const id = AccountID.make("user-1") @@ -151,7 +151,7 @@ it.effect("remove deletes an account", () => }), ) -it.effect("use stores the selected org and marks the account active", () => +it.live("use stores the selected org and marks the account active", () => Effect.gen(function* () { const id1 = AccountID.make("user-1") const id2 = AccountID.make("user-2") @@ -191,7 +191,7 @@ it.effect("use stores the selected org and marks the account active", () => }), ) -it.effect("persistToken updates token fields", () => +it.live("persistToken updates token fields", () => Effect.gen(function* () { const id = AccountID.make("user-1") @@ -225,7 +225,7 @@ it.effect("persistToken updates token fields", () => }), ) -it.effect("persistToken with no expiry sets token_expiry to null", () => +it.live("persistToken with no expiry sets token_expiry to null", () => Effect.gen(function* () { const id = AccountID.make("user-1") @@ -255,7 +255,7 @@ it.effect("persistToken with no expiry sets token_expiry to null", () => }), ) -it.effect("persistAccount upserts on conflict", () => +it.live("persistAccount upserts on conflict", () => Effect.gen(function* () { const id = AccountID.make("user-1") @@ -295,7 +295,7 @@ it.effect("persistAccount upserts on conflict", () => }), ) -it.effect("remove clears active state when deleting the active account", () => +it.live("remove clears active state when deleting the active account", () => Effect.gen(function* () { const id = AccountID.make("user-1") @@ -318,7 +318,7 @@ it.effect("remove clears active state when deleting the active account", () => }), ) -it.effect("getRow returns none for nonexistent account", () => +it.live("getRow returns none for nonexistent account", () => Effect.gen(function* () { const row = yield* AccountRepo.use((r) => r.getRow(AccountID.make("nope"))) expect(Option.isNone(row)).toBe(true) diff --git a/packages/opencode/test/account/service.test.ts b/packages/opencode/test/account/service.test.ts index 9c67641d2089..cfe55e23e4f6 100644 --- a/packages/opencode/test/account/service.test.ts +++ b/packages/opencode/test/account/service.test.ts @@ -54,7 +54,7 @@ const deviceTokenClient = (body: unknown, status = 400) => const poll = (body: unknown, status = 400) => Account.Service.use((s) => s.poll(login())).pipe(Effect.provide(live(deviceTokenClient(body, status)))) -it.effect("orgsByAccount groups orgs per account", () => +it.live("orgsByAccount groups orgs per account", () => Effect.gen(function* () { yield* AccountRepo.use((r) => r.persistAccount({ @@ -107,7 +107,7 @@ it.effect("orgsByAccount groups orgs per account", () => }), ) -it.effect("token refresh persists the new token", () => +it.live("token refresh persists the new token", () => Effect.gen(function* () { const id = AccountID.make("user-1") @@ -148,7 +148,7 @@ it.effect("token refresh persists the new token", () => }), ) -it.effect("config sends the selected org header", () => +it.live("config sends the selected org header", () => Effect.gen(function* () { const id = AccountID.make("user-1") @@ -188,7 +188,7 @@ it.effect("config sends the selected org header", () => }), ) -it.effect("poll stores the account and first org on success", () => +it.live("poll stores the account and first org on success", () => Effect.gen(function* () { const client = HttpClient.make((req) => Effect.succeed( @@ -259,7 +259,7 @@ for (const [name, body, expectedTag] of [ "PollExpired", ], ] as const) { - it.effect(`poll returns ${name} for ${body.error}`, () => + it.live(`poll returns ${name} for ${body.error}`, () => Effect.gen(function* () { const result = yield* poll(body) expect(result._tag).toBe(expectedTag) @@ -267,7 +267,7 @@ for (const [name, body, expectedTag] of [ ) } -it.effect("poll returns poll error for other OAuth errors", () => +it.live("poll returns poll error for other OAuth errors", () => Effect.gen(function* () { const result = yield* poll({ error: "server_error", diff --git a/packages/opencode/test/bus/bus-effect.test.ts b/packages/opencode/test/bus/bus-effect.test.ts index 642763e90fdd..6f3bcbcfab82 100644 --- a/packages/opencode/test/bus/bus-effect.test.ts +++ b/packages/opencode/test/bus/bus-effect.test.ts @@ -22,7 +22,7 @@ const live = Layer.mergeAll(Bus.layer, node) const it = testEffect(live) describe("Bus (Effect-native)", () => { - it.effect("publish + subscribe stream delivers events", () => + it.live("publish + subscribe stream delivers events", () => provideTmpdirInstance(() => Effect.gen(function* () { const bus = yield* Bus.Service @@ -46,7 +46,7 @@ describe("Bus (Effect-native)", () => { ), ) - it.effect("subscribe filters by event type", () => + it.live("subscribe filters by event type", () => provideTmpdirInstance(() => Effect.gen(function* () { const bus = yield* Bus.Service @@ -70,7 +70,7 @@ describe("Bus (Effect-native)", () => { ), ) - it.effect("subscribeAll receives all types", () => + it.live("subscribeAll receives all types", () => provideTmpdirInstance(() => Effect.gen(function* () { const bus = yield* Bus.Service @@ -95,7 +95,7 @@ describe("Bus (Effect-native)", () => { ), ) - it.effect("multiple subscribers each receive the event", () => + it.live("multiple subscribers each receive the event", () => provideTmpdirInstance(() => Effect.gen(function* () { const bus = yield* Bus.Service @@ -129,7 +129,7 @@ describe("Bus (Effect-native)", () => { ), ) - it.effect("subscribeAll stream sees InstanceDisposed on disposal", () => + it.live("subscribeAll stream sees InstanceDisposed on disposal", () => Effect.gen(function* () { const dir = yield* tmpdirScoped() const types: string[] = [] diff --git a/packages/opencode/test/effect/instance-state.test.ts b/packages/opencode/test/effect/instance-state.test.ts index 2d527482ba11..914753312f02 100644 --- a/packages/opencode/test/effect/instance-state.test.ts +++ b/packages/opencode/test/effect/instance-state.test.ts @@ -1,6 +1,7 @@ import { afterEach, expect, test } from "bun:test" -import { Duration, Effect, Layer, ManagedRuntime, ServiceMap } from "effect" +import { Cause, Deferred, Duration, Effect, Exit, Fiber, Layer, ManagedRuntime, ServiceMap } from "effect" import { InstanceState } from "../../src/effect/instance-state" +import { InstanceRef } from "../../src/effect/instance-ref" import { Instance } from "../../src/project/instance" import { tmpdir } from "../fixture/fixture" @@ -382,3 +383,100 @@ test("InstanceState dedupes concurrent lookups", async () => { ), ) }) + +test("InstanceState survives deferred resume from the same instance context", async () => { + await using tmp = await tmpdir({ git: true }) + + interface Api { + readonly get: (gate: Deferred.Deferred) => Effect.Effect + } + + class Test extends ServiceMap.Service()("@test/DeferredResume") { + static readonly layer = Layer.effect( + Test, + Effect.gen(function* () { + const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory)) + + return Test.of({ + get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred) { + yield* Deferred.await(gate) + return yield* InstanceState.get(state) + }), + }) + }), + ) + } + + const rt = ManagedRuntime.make(Test.layer) + + try { + const gate = await Effect.runPromise(Deferred.make()) + const fiber = await Instance.provide({ + directory: tmp.path, + fn: () => Promise.resolve(rt.runFork(Test.use((svc) => svc.get(gate)))), + }) + + await Instance.provide({ + directory: tmp.path, + fn: () => Effect.runPromise(Deferred.succeed(gate, void 0)), + }) + const exit = await Effect.runPromise(Fiber.await(fiber)) + + expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + expect(exit.value).toBe(tmp.path) + } + } finally { + await rt.dispose() + } +}) + +test("InstanceState survives deferred resume outside ALS when InstanceRef is set", async () => { + await using tmp = await tmpdir({ git: true }) + + interface Api { + readonly get: (gate: Deferred.Deferred) => Effect.Effect + } + + class Test extends ServiceMap.Service()("@test/DeferredResumeOutside") { + static readonly layer = Layer.effect( + Test, + Effect.gen(function* () { + const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory)) + + return Test.of({ + get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred) { + yield* Deferred.await(gate) + return yield* InstanceState.get(state) + }), + }) + }), + ) + } + + const rt = ManagedRuntime.make(Test.layer) + + try { + const gate = await Effect.runPromise(Deferred.make()) + // Provide InstanceRef so the fiber carries the context even when + // the deferred is resolved from outside Instance.provide ALS. + const fiber = await Instance.provide({ + directory: tmp.path, + fn: () => + Promise.resolve( + rt.runFork(Test.use((svc) => svc.get(gate)).pipe(Effect.provideService(InstanceRef, Instance.current))), + ), + }) + + // Resume from outside any Instance.provide — ALS is NOT set here + await Effect.runPromise(Deferred.succeed(gate, void 0)) + const exit = await Effect.runPromise(Fiber.await(fiber)) + + expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + expect(exit.value).toBe(tmp.path) + } + } finally { + await rt.dispose() + } +}) diff --git a/packages/opencode/test/effect/runner.test.ts b/packages/opencode/test/effect/runner.test.ts index 5d3488849c59..9dc395876ee0 100644 --- a/packages/opencode/test/effect/runner.test.ts +++ b/packages/opencode/test/effect/runner.test.ts @@ -6,7 +6,7 @@ import { it } from "../lib/effect" describe("Runner", () => { // --- ensureRunning semantics --- - it.effect( + it.live( "ensureRunning starts work and returns result", Effect.gen(function* () { const s = yield* Scope.Scope @@ -18,7 +18,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "ensureRunning propagates work failures", Effect.gen(function* () { const s = yield* Scope.Scope @@ -29,7 +29,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "concurrent callers share the same run", Effect.gen(function* () { const s = yield* Scope.Scope @@ -51,7 +51,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "concurrent callers all receive same error", Effect.gen(function* () { const s = yield* Scope.Scope @@ -71,7 +71,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "ensureRunning can be called again after previous run completes", Effect.gen(function* () { const s = yield* Scope.Scope @@ -81,7 +81,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "second ensureRunning ignores new work if already running", Effect.gen(function* () { const s = yield* Scope.Scope @@ -110,7 +110,7 @@ describe("Runner", () => { // --- cancel semantics --- - it.effect( + it.live( "cancel interrupts running work", Effect.gen(function* () { const s = yield* Scope.Scope @@ -128,7 +128,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "cancel on idle is a no-op", Effect.gen(function* () { const s = yield* Scope.Scope @@ -138,7 +138,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "cancel with onInterrupt resolves callers gracefully", Effect.gen(function* () { const s = yield* Scope.Scope @@ -154,7 +154,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "cancel with queued callers resolves all", Effect.gen(function* () { const s = yield* Scope.Scope @@ -175,7 +175,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "work can be started after cancel", Effect.gen(function* () { const s = yield* Scope.Scope @@ -245,7 +245,7 @@ describe("Runner", () => { // --- shell semantics --- - it.effect( + it.live( "shell runs exclusively", Effect.gen(function* () { const s = yield* Scope.Scope @@ -256,7 +256,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "shell rejects when run is active", Effect.gen(function* () { const s = yield* Scope.Scope @@ -272,7 +272,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "shell rejects when another shell is running", Effect.gen(function* () { const s = yield* Scope.Scope @@ -292,7 +292,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "shell rejects via busy callback and cancel still stops the first shell", Effect.gen(function* () { const s = yield* Scope.Scope @@ -323,7 +323,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "cancel interrupts shell that ignores abort signal", Effect.gen(function* () { const s = yield* Scope.Scope @@ -349,7 +349,7 @@ describe("Runner", () => { // --- shell→run handoff --- - it.effect( + it.live( "ensureRunning queues behind shell then runs after", Effect.gen(function* () { const s = yield* Scope.Scope @@ -376,7 +376,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "multiple ensureRunning callers share the queued run behind shell", Effect.gen(function* () { const s = yield* Scope.Scope @@ -407,7 +407,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "cancel during shell_then_run cancels both", Effect.gen(function* () { const s = yield* Scope.Scope @@ -441,7 +441,7 @@ describe("Runner", () => { // --- lifecycle callbacks --- - it.effect( + it.live( "onIdle fires when returning to idle from running", Effect.gen(function* () { const s = yield* Scope.Scope @@ -454,7 +454,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "onIdle fires on cancel", Effect.gen(function* () { const s = yield* Scope.Scope @@ -470,7 +470,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "onBusy fires when shell starts", Effect.gen(function* () { const s = yield* Scope.Scope @@ -485,7 +485,7 @@ describe("Runner", () => { // --- busy flag --- - it.effect( + it.live( "busy is true during run", Effect.gen(function* () { const s = yield* Scope.Scope @@ -502,7 +502,7 @@ describe("Runner", () => { }), ) - it.effect( + it.live( "busy is true during shell", Effect.gen(function* () { const s = yield* Scope.Scope diff --git a/packages/opencode/test/fixture/fixture.ts b/packages/opencode/test/fixture/fixture.ts index a36a3f9d84dc..a50e0c4f614d 100644 --- a/packages/opencode/test/fixture/fixture.ts +++ b/packages/opencode/test/fixture/fixture.ts @@ -2,10 +2,14 @@ import { $ } from "bun" import * as fs from "fs/promises" import os from "os" import path from "path" -import { Effect, FileSystem, ServiceMap } from "effect" +import { Effect, ServiceMap } from "effect" +import type * as PlatformError from "effect/PlatformError" +import type * as Scope from "effect/Scope" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import type { Config } from "../../src/config/config" +import { InstanceRef } from "../../src/effect/instance-ref" import { Instance } from "../../src/project/instance" +import { TestLLMServer } from "../lib/llm-server" // Strip null bytes from paths (defensive fix for CI environment issues) function sanitizePath(p: string): string { @@ -78,9 +82,17 @@ export async function tmpdir(options?: TmpDirOptions) { /** Effectful scoped tmpdir. Cleaned up when the scope closes. Make sure these stay in sync */ export function tmpdirScoped(options?: { git?: boolean; config?: Partial }) { return Effect.gen(function* () { - const fs = yield* FileSystem.FileSystem const spawner = yield* ChildProcessSpawner.ChildProcessSpawner - const dir = yield* fs.makeTempDirectoryScoped({ prefix: "opencode-test-" }) + const dirpath = sanitizePath(path.join(os.tmpdir(), "opencode-test-" + Math.random().toString(36).slice(2))) + yield* Effect.promise(() => fs.mkdir(dirpath, { recursive: true })) + const dir = sanitizePath(yield* Effect.promise(() => fs.realpath(dirpath))) + + yield* Effect.addFinalizer(() => + Effect.promise(async () => { + if (options?.git) await stop(dir).catch(() => undefined) + await clean(dir).catch(() => undefined) + }), + ) const git = (...args: string[]) => spawner.spawn(ChildProcess.make("git", args, { cwd: dir })).pipe(Effect.flatMap((handle) => handle.exitCode)) @@ -94,9 +106,11 @@ export function tmpdirScoped(options?: { git?: boolean; config?: Partial + fs.writeFile( + path.join(dir, "opencode.json"), + JSON.stringify({ $schema: "https://opencode.ai/config.json", ...options.config }), + ), ) } @@ -111,7 +125,7 @@ export const provideInstance = Effect.promise(async () => Instance.provide({ directory, - fn: () => Effect.runPromiseWith(services)(self), + fn: () => Effect.runPromiseWith(services)(self.pipe(Effect.provideService(InstanceRef, Instance.current))), }), ), ) @@ -139,3 +153,20 @@ export function provideTmpdirInstance( return yield* self(path).pipe(provideInstance(path)) }) } + +export function provideTmpdirServer( + self: (input: { dir: string; llm: TestLLMServer["Service"] }) => Effect.Effect, + options?: { git?: boolean; config?: (url: string) => Partial }, +): Effect.Effect< + A, + E | PlatformError.PlatformError, + R | TestLLMServer | ChildProcessSpawner.ChildProcessSpawner | Scope.Scope +> { + return Effect.gen(function* () { + const llm = yield* TestLLMServer + return yield* provideTmpdirInstance((dir) => self({ dir, llm }), { + git: options?.git, + config: options?.config?.(llm.url), + }) + }) +} diff --git a/packages/opencode/test/format/format.test.ts b/packages/opencode/test/format/format.test.ts index 74336e02a394..95fe763d4116 100644 --- a/packages/opencode/test/format/format.test.ts +++ b/packages/opencode/test/format/format.test.ts @@ -10,7 +10,7 @@ import * as Formatter from "../../src/format/formatter" const it = testEffect(Layer.mergeAll(Format.defaultLayer, CrossSpawnSpawner.defaultLayer, NodeFileSystem.layer)) describe("Format", () => { - it.effect("status() returns built-in formatters when no config overrides", () => + it.live("status() returns built-in formatters when no config overrides", () => provideTmpdirInstance(() => Format.Service.use((fmt) => Effect.gen(function* () { @@ -32,7 +32,7 @@ describe("Format", () => { ), ) - it.effect("status() returns empty list when formatter is disabled", () => + it.live("status() returns empty list when formatter is disabled", () => provideTmpdirInstance( () => Format.Service.use((fmt) => @@ -44,7 +44,7 @@ describe("Format", () => { ), ) - it.effect("status() excludes formatters marked as disabled in config", () => + it.live("status() excludes formatters marked as disabled in config", () => provideTmpdirInstance( () => Format.Service.use((fmt) => @@ -64,11 +64,11 @@ describe("Format", () => { ), ) - it.effect("service initializes without error", () => + it.live("service initializes without error", () => provideTmpdirInstance(() => Format.Service.use(() => Effect.void)), ) - it.effect("status() initializes formatter state per directory", () => + it.live("status() initializes formatter state per directory", () => Effect.gen(function* () { const a = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()), { config: { formatter: false }, @@ -80,7 +80,7 @@ describe("Format", () => { }), ) - it.effect("runs enabled checks for matching formatters in parallel", () => + it.live("runs enabled checks for matching formatters in parallel", () => provideTmpdirInstance((path) => Effect.gen(function* () { const file = `${path}/test.parallel` @@ -144,7 +144,7 @@ describe("Format", () => { ), ) - it.effect("runs matching formatters sequentially for the same file", () => + it.live("runs matching formatters sequentially for the same file", () => provideTmpdirInstance( (path) => Effect.gen(function* () { diff --git a/packages/opencode/test/lib/effect.ts b/packages/opencode/test/lib/effect.ts index 4162ba0924ee..131ec5cc6bc2 100644 --- a/packages/opencode/test/lib/effect.ts +++ b/packages/opencode/test/lib/effect.ts @@ -1,14 +1,14 @@ import { test, type TestOptions } from "bun:test" import { Cause, Effect, Exit, Layer } from "effect" import type * as Scope from "effect/Scope" +import * as TestClock from "effect/testing/TestClock" import * as TestConsole from "effect/testing/TestConsole" type Body = Effect.Effect | (() => Effect.Effect) -const env = TestConsole.layer const body = (value: Body) => Effect.suspend(() => (typeof value === "function" ? value() : value)) -const run = (value: Body, layer: Layer.Layer) => +const run = (value: Body, layer: Layer.Layer) => Effect.gen(function* () { const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(layer), Effect.exit) if (Exit.isFailure(exit)) { @@ -19,19 +19,35 @@ const run = (value: Body, layer: Layer.Layer return yield* exit }).pipe(Effect.runPromise) -const make = (layer: Layer.Layer) => { +const make = (testLayer: Layer.Layer, liveLayer: Layer.Layer) => { const effect = (name: string, value: Body, opts?: number | TestOptions) => - test(name, () => run(value, layer), opts) + test(name, () => run(value, testLayer), opts) effect.only = (name: string, value: Body, opts?: number | TestOptions) => - test.only(name, () => run(value, layer), opts) + test.only(name, () => run(value, testLayer), opts) effect.skip = (name: string, value: Body, opts?: number | TestOptions) => - test.skip(name, () => run(value, layer), opts) + test.skip(name, () => run(value, testLayer), opts) - return { effect } + const live = (name: string, value: Body, opts?: number | TestOptions) => + test(name, () => run(value, liveLayer), opts) + + live.only = (name: string, value: Body, opts?: number | TestOptions) => + test.only(name, () => run(value, liveLayer), opts) + + live.skip = (name: string, value: Body, opts?: number | TestOptions) => + test.skip(name, () => run(value, liveLayer), opts) + + return { effect, live } } -export const it = make(env) +// Test environment with TestClock and TestConsole +const testEnv = Layer.mergeAll(TestConsole.layer, TestClock.layer()) + +// Live environment - uses real clock, but keeps TestConsole for output capture +const liveEnv = TestConsole.layer + +export const it = make(testEnv, liveEnv) -export const testEffect = (layer: Layer.Layer) => make(Layer.provideMerge(layer, env)) +export const testEffect = (layer: Layer.Layer) => + make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv)) diff --git a/packages/opencode/test/lib/llm-server.ts b/packages/opencode/test/lib/llm-server.ts new file mode 100644 index 000000000000..b0a54424efc5 --- /dev/null +++ b/packages/opencode/test/lib/llm-server.ts @@ -0,0 +1,282 @@ +import { NodeHttpServer } from "@effect/platform-node" +import * as Http from "node:http" +import { Deferred, Effect, Layer, ServiceMap, Stream } from "effect" +import * as HttpServer from "effect/unstable/http/HttpServer" +import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http" + +type Step = + | { + type: "text" + text: string + } + | { + type: "tool" + tool: string + input: unknown + } + | { + type: "fail" + message: string + } + | { + type: "hang" + } + | { + type: "hold" + text: string + wait: PromiseLike + } + +type Hit = { + url: URL + body: Record +} + +type Wait = { + count: number + ready: Deferred.Deferred +} + +function sse(lines: unknown[]) { + return HttpServerResponse.stream( + Stream.fromIterable([ + [...lines.map((line) => `data: ${JSON.stringify(line)}`), "data: [DONE]"].join("\n\n") + "\n\n", + ]).pipe(Stream.encodeText), + { contentType: "text/event-stream" }, + ) +} + +function text(step: Extract) { + return sse([ + { + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [{ delta: { role: "assistant" } }], + }, + { + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [{ delta: { content: step.text } }], + }, + { + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [{ delta: {}, finish_reason: "stop" }], + }, + ]) +} + +function tool(step: Extract, seq: number) { + const id = `call_${seq}` + const args = JSON.stringify(step.input) + return sse([ + { + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [{ delta: { role: "assistant" } }], + }, + { + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [ + { + delta: { + tool_calls: [ + { + index: 0, + id, + type: "function", + function: { + name: step.tool, + arguments: "", + }, + }, + ], + }, + }, + ], + }, + { + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [ + { + delta: { + tool_calls: [ + { + index: 0, + function: { + arguments: args, + }, + }, + ], + }, + }, + ], + }, + { + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [{ delta: {}, finish_reason: "tool_calls" }], + }, + ]) +} + +function fail(step: Extract) { + return HttpServerResponse.stream( + Stream.fromIterable([ + 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n', + ]).pipe(Stream.encodeText, Stream.concat(Stream.fail(new Error(step.message)))), + { contentType: "text/event-stream" }, + ) +} + +function hang() { + return HttpServerResponse.stream( + Stream.fromIterable([ + 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n', + ]).pipe(Stream.encodeText, Stream.concat(Stream.never)), + { contentType: "text/event-stream" }, + ) +} + +function hold(step: Extract) { + return HttpServerResponse.stream( + Stream.fromIterable([ + 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n', + ]).pipe( + Stream.encodeText, + Stream.concat( + Stream.fromEffect(Effect.promise(() => step.wait)).pipe( + Stream.flatMap(() => + Stream.fromIterable([ + `data: ${JSON.stringify({ + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [{ delta: { content: step.text } }], + })}\n\n`, + `data: ${JSON.stringify({ + id: "chatcmpl-test", + object: "chat.completion.chunk", + choices: [{ delta: {}, finish_reason: "stop" }], + })}\n\n`, + "data: [DONE]\n\n", + ]).pipe(Stream.encodeText), + ), + ), + ), + ), + { contentType: "text/event-stream" }, + ) +} + +namespace TestLLMServer { + export interface Service { + readonly url: string + readonly text: (value: string) => Effect.Effect + readonly tool: (tool: string, input: unknown) => Effect.Effect + readonly fail: (message?: string) => Effect.Effect + readonly hang: Effect.Effect + readonly hold: (text: string, wait: PromiseLike) => Effect.Effect + readonly hits: Effect.Effect + readonly calls: Effect.Effect + readonly wait: (count: number) => Effect.Effect + readonly inputs: Effect.Effect[]> + readonly pending: Effect.Effect + } +} + +export class TestLLMServer extends ServiceMap.Service()("@test/LLMServer") { + static readonly layer = Layer.effect( + TestLLMServer, + Effect.gen(function* () { + const server = yield* HttpServer.HttpServer + const router = yield* HttpRouter.HttpRouter + + let hits: Hit[] = [] + let list: Step[] = [] + let seq = 0 + let waits: Wait[] = [] + + const push = (step: Step) => { + list = [...list, step] + } + + const notify = Effect.fnUntraced(function* () { + const ready = waits.filter((item) => hits.length >= item.count) + if (!ready.length) return + waits = waits.filter((item) => hits.length < item.count) + yield* Effect.forEach(ready, (item) => Deferred.succeed(item.ready, void 0)) + }) + + const pull = () => { + const step = list[0] + if (!step) return { step: undefined, seq } + seq += 1 + list = list.slice(1) + return { step, seq } + } + + yield* router.add( + "POST", + "/v1/chat/completions", + Effect.gen(function* () { + const req = yield* HttpServerRequest.HttpServerRequest + const next = pull() + if (!next.step) return HttpServerResponse.text("unexpected request", { status: 500 }) + const json = yield* req.json.pipe(Effect.orElseSucceed(() => ({}))) + hits = [ + ...hits, + { + url: new URL(req.originalUrl, "http://localhost"), + body: json && typeof json === "object" ? (json as Record) : {}, + }, + ] + yield* notify() + if (next.step.type === "text") return text(next.step) + if (next.step.type === "tool") return tool(next.step, next.seq) + if (next.step.type === "fail") return fail(next.step) + if (next.step.type === "hang") return hang() + return hold(next.step) + }), + ) + + yield* server.serve(router.asHttpEffect()) + + return TestLLMServer.of({ + url: + server.address._tag === "TcpAddress" + ? `http://127.0.0.1:${server.address.port}/v1` + : `unix://${server.address.path}/v1`, + text: Effect.fn("TestLLMServer.text")(function* (value: string) { + push({ type: "text", text: value }) + }), + tool: Effect.fn("TestLLMServer.tool")(function* (tool: string, input: unknown) { + push({ type: "tool", tool, input }) + }), + fail: Effect.fn("TestLLMServer.fail")(function* (message = "boom") { + push({ type: "fail", message }) + }), + hang: Effect.gen(function* () { + push({ type: "hang" }) + }).pipe(Effect.withSpan("TestLLMServer.hang")), + hold: Effect.fn("TestLLMServer.hold")(function* (text: string, wait: PromiseLike) { + push({ type: "hold", text, wait }) + }), + hits: Effect.sync(() => [...hits]), + calls: Effect.sync(() => hits.length), + wait: Effect.fn("TestLLMServer.wait")(function* (count: number) { + if (hits.length >= count) return + const ready = yield* Deferred.make() + waits = [...waits, { count, ready }] + yield* Deferred.await(ready) + }), + inputs: Effect.sync(() => hits.map((hit) => hit.body)), + pending: Effect.sync(() => list.length), + }) + }), + ).pipe( + Layer.provide(HttpRouter.layer), // + Layer.provide(NodeHttpServer.layer(() => Http.createServer(), { port: 0 })), + ) +} diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 0dfdef26f61e..23c6911a2cb5 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -264,7 +264,7 @@ const env = SessionProcessor.layer.pipe(Layer.provideMerge(deps)) const it = testEffect(env) -it.effect("session.processor effect tests capture llm input cleanly", () => { +it.live("session.processor effect tests capture llm input cleanly", () => { return provideTmpdirInstance( (dir) => Effect.gen(function* () { @@ -316,7 +316,7 @@ it.effect("session.processor effect tests capture llm input cleanly", () => { ) }) -it.effect("session.processor effect tests stop after token overflow requests compaction", () => { +it.live("session.processor effect tests stop after token overflow requests compaction", () => { return provideTmpdirInstance( (dir) => Effect.gen(function* () { @@ -376,7 +376,7 @@ it.effect("session.processor effect tests stop after token overflow requests com ) }) -it.effect("session.processor effect tests reset reasoning state across retries", () => { +it.live("session.processor effect tests reset reasoning state across retries", () => { return provideTmpdirInstance( (dir) => Effect.gen(function* () { @@ -449,7 +449,7 @@ it.effect("session.processor effect tests reset reasoning state across retries", ) }) -it.effect("session.processor effect tests do not retry unknown json errors", () => { +it.live("session.processor effect tests do not retry unknown json errors", () => { return provideTmpdirInstance( (dir) => Effect.gen(function* () { @@ -495,7 +495,7 @@ it.effect("session.processor effect tests do not retry unknown json errors", () ) }) -it.effect("session.processor effect tests retry recognized structured json errors", () => { +it.live("session.processor effect tests retry recognized structured json errors", () => { return provideTmpdirInstance( (dir) => Effect.gen(function* () { @@ -544,7 +544,7 @@ it.effect("session.processor effect tests retry recognized structured json error ) }) -it.effect("session.processor effect tests publish retry status updates", () => { +it.live("session.processor effect tests publish retry status updates", () => { return provideTmpdirInstance( (dir) => Effect.gen(function* () { @@ -611,7 +611,7 @@ it.effect("session.processor effect tests publish retry status updates", () => { ) }) -it.effect("session.processor effect tests compact on structured context overflow", () => { +it.live("session.processor effect tests compact on structured context overflow", () => { return provideTmpdirInstance( (dir) => Effect.gen(function* () { @@ -656,7 +656,7 @@ it.effect("session.processor effect tests compact on structured context overflow ) }) -it.effect("session.processor effect tests mark pending tools as aborted on cleanup", () => { +it.live("session.processor effect tests mark pending tools as aborted on cleanup", () => { return provideTmpdirInstance( (dir) => Effect.gen(function* () { @@ -725,7 +725,7 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean ) }) -it.effect("session.processor effect tests record aborted errors and idle state", () => { +it.live("session.processor effect tests record aborted errors and idle state", () => { return provideTmpdirInstance( (dir) => Effect.gen(function* () { @@ -807,7 +807,7 @@ it.effect("session.processor effect tests record aborted errors and idle state", ) }) -it.effect("session.processor effect tests mark interruptions aborted without manual abort", () => { +it.live("session.processor effect tests mark interruptions aborted without manual abort", () => { return provideTmpdirInstance( (dir) => Effect.gen(function* () { diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts index ef664113f382..98111bb3a269 100644 --- a/packages/opencode/test/session/prompt-effect.test.ts +++ b/packages/opencode/test/session/prompt-effect.test.ts @@ -1,7 +1,6 @@ import { NodeFileSystem } from "@effect/platform-node" import { expect, spyOn } from "bun:test" -import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect" -import * as Stream from "effect/Stream" +import { Cause, Effect, Exit, Fiber, Layer } from "effect" import z from "zod" import type { Agent } from "../../src/agent/agent" import { Agent as AgentSvc } from "../../src/agent/agent" @@ -31,8 +30,9 @@ import { ToolRegistry } from "../../src/tool/registry" import { Truncate } from "../../src/tool/truncate" import { Log } from "../../src/util/log" import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner" -import { provideTmpdirInstance } from "../fixture/fixture" +import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture" import { testEffect } from "../lib/effect" +import { TestLLMServer } from "../lib/llm-server" Log.init({ print: false }) @@ -41,105 +41,6 @@ const ref = { modelID: ModelID.make("test-model"), } -type Script = Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream) - -class TestLLM extends ServiceMap.Service< - TestLLM, - { - readonly push: (stream: Script) => Effect.Effect - readonly reply: (...items: LLM.Event[]) => Effect.Effect - readonly calls: Effect.Effect - readonly inputs: Effect.Effect - } ->()("@test/PromptLLM") {} - -function stream(...items: LLM.Event[]) { - return Stream.make(...items) -} - -function usage(input = 1, output = 1, total = input + output) { - return { - inputTokens: input, - outputTokens: output, - totalTokens: total, - inputTokenDetails: { - noCacheTokens: undefined, - cacheReadTokens: undefined, - cacheWriteTokens: undefined, - }, - outputTokenDetails: { - textTokens: undefined, - reasoningTokens: undefined, - }, - } -} - -function start(): LLM.Event { - return { type: "start" } -} - -function textStart(id = "t"): LLM.Event { - return { type: "text-start", id } -} - -function textDelta(id: string, text: string): LLM.Event { - return { type: "text-delta", id, text } -} - -function textEnd(id = "t"): LLM.Event { - return { type: "text-end", id } -} - -function finishStep(): LLM.Event { - return { - type: "finish-step", - finishReason: "stop", - rawFinishReason: "stop", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: usage(), - } -} - -function finish(): LLM.Event { - return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() } -} - -function finishToolCallsStep(): LLM.Event { - return { - type: "finish-step", - finishReason: "tool-calls", - rawFinishReason: "tool_calls", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: usage(), - } -} - -function finishToolCalls(): LLM.Event { - return { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() } -} - -function replyStop(text: string, id = "t") { - return [start(), textStart(id), textDelta(id, text), textEnd(id), finishStep(), finish()] as const -} - -function replyToolCalls(text: string, id = "t") { - return [start(), textStart(id), textDelta(id, text), textEnd(id), finishToolCallsStep(), finishToolCalls()] as const -} - -function toolInputStart(id: string, toolName: string): LLM.Event { - return { type: "tool-input-start", id, toolName } -} - -function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event { - return { type: "tool-call", toolCallId, toolName, input } -} - -function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) { - return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never))) -} - function defer() { let resolve!: (value: T | PromiseLike) => void const promise = new Promise((done) => { @@ -148,10 +49,6 @@ function defer() { return { promise, resolve } } -function waitMs(ms: number) { - return Effect.promise(() => new Promise((done) => setTimeout(done, ms))) -} - function withSh(fx: () => Effect.Effect) { return Effect.acquireUseRelease( Effect.sync(() => { @@ -189,43 +86,6 @@ function errorTool(parts: MessageV2.Part[]) { return part?.state.status === "error" ? (part as ErrorToolPart) : undefined } -const llm = Layer.unwrap( - Effect.gen(function* () { - const queue: Script[] = [] - const inputs: LLM.StreamInput[] = [] - let calls = 0 - - const push = Effect.fn("TestLLM.push")((item: Script) => { - queue.push(item) - return Effect.void - }) - - const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items))) - return Layer.mergeAll( - Layer.succeed( - LLM.Service, - LLM.Service.of({ - stream: (input) => { - calls += 1 - inputs.push(input) - const item = queue.shift() ?? Stream.empty - return typeof item === "function" ? item(input) : item - }, - }), - ), - Layer.succeed( - TestLLM, - TestLLM.of({ - push, - reply, - calls: Effect.sync(() => calls), - inputs: Effect.sync(() => [...inputs]), - }), - ), - ) - }), -) - const mcp = Layer.succeed( MCP.Service, MCP.Service.of({ @@ -281,35 +141,40 @@ const filetime = Layer.succeed( const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer)) const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer) -const deps = Layer.mergeAll( - Session.defaultLayer, - Snapshot.defaultLayer, - AgentSvc.defaultLayer, - Command.defaultLayer, - Permission.layer, - Plugin.defaultLayer, - Config.defaultLayer, - filetime, - lsp, - mcp, - AppFileSystem.defaultLayer, - status, - llm, -).pipe(Layer.provideMerge(infra)) -const registry = ToolRegistry.layer.pipe(Layer.provideMerge(deps)) -const trunc = Truncate.layer.pipe(Layer.provideMerge(deps)) -const proc = SessionProcessor.layer.pipe(Layer.provideMerge(deps)) -const compact = SessionCompaction.layer.pipe(Layer.provideMerge(proc), Layer.provideMerge(deps)) -const env = SessionPrompt.layer.pipe( - Layer.provideMerge(compact), - Layer.provideMerge(proc), - Layer.provideMerge(registry), - Layer.provideMerge(trunc), - Layer.provideMerge(deps), -) +function makeHttp() { + const deps = Layer.mergeAll( + Session.defaultLayer, + Snapshot.defaultLayer, + LLM.defaultLayer, + AgentSvc.defaultLayer, + Command.defaultLayer, + Permission.layer, + Plugin.defaultLayer, + Config.defaultLayer, + filetime, + lsp, + mcp, + AppFileSystem.defaultLayer, + status, + ).pipe(Layer.provideMerge(infra)) + const registry = ToolRegistry.layer.pipe(Layer.provideMerge(deps)) + const trunc = Truncate.layer.pipe(Layer.provideMerge(deps)) + const proc = SessionProcessor.layer.pipe(Layer.provideMerge(deps)) + const compact = SessionCompaction.layer.pipe(Layer.provideMerge(proc), Layer.provideMerge(deps)) + return Layer.mergeAll( + TestLLMServer.layer, + SessionPrompt.layer.pipe( + Layer.provideMerge(compact), + Layer.provideMerge(proc), + Layer.provideMerge(registry), + Layer.provideMerge(trunc), + Layer.provideMerge(deps), + ), + ) +} -const it = testEffect(env) -const unix = process.platform !== "win32" ? it.effect : it.effect.skip +const it = testEffect(makeHttp()) +const unix = process.platform !== "win32" ? it.live : it.live.skip // Config that registers a custom "test" provider with a "test-model" model // so Provider.getModel("test", "test-model") succeeds inside the loop. @@ -342,6 +207,22 @@ const cfg = { }, } +function providerCfg(url: string) { + return { + ...cfg, + provider: { + ...cfg.provider, + test: { + ...cfg.provider.test, + options: { + ...cfg.provider.test.options, + baseURL: url, + }, + }, + }, + } +} + const user = Effect.fn("test.user")(function* (sessionID: SessionID, text: string) { const session = yield* Session.Service const msg = yield* session.updateMessage({ @@ -407,232 +288,300 @@ const addSubtask = (sessionID: SessionID, messageID: MessageID, model = ref) => }) const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) { - const test = yield* TestLLM const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service const chat = yield* sessions.create(input ?? { title: "Pinned" }) - return { test, prompt, sessions, chat } + return { prompt, sessions, chat } }) // Loop semantics -it.effect("loop exits immediately when last assistant has stop finish", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot() - yield* seed(chat.id, { finish: "stop" }) - - const result = yield* prompt.loop({ sessionID: chat.id }) - expect(result.info.role).toBe("assistant") - if (result.info.role === "assistant") expect(result.info.finish).toBe("stop") - expect(yield* test.calls).toBe(0) - }), - { git: true }, +it.live("loop exits immediately when last assistant has stop finish", () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* seed(chat.id, { finish: "stop" }) + + const result = yield* prompt.loop({ sessionID: chat.id }) + expect(result.info.role).toBe("assistant") + if (result.info.role === "assistant") expect(result.info.finish).toBe("stop") + expect(yield* llm.calls).toBe(0) + }), + { git: true, config: providerCfg }, ), ) -it.effect("loop calls LLM and returns assistant message", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot() - yield* test.reply(...replyStop("world")) - yield* user(chat.id, "hello") - - const result = yield* prompt.loop({ sessionID: chat.id }) - expect(result.info.role).toBe("assistant") - const parts = result.parts.filter((p) => p.type === "text") - expect(parts.some((p) => p.type === "text" && p.text === "world")).toBe(true) - expect(yield* test.calls).toBe(1) - }), - { git: true, config: cfg }, +it.live("loop calls LLM and returns assistant message", () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + title: "Pinned", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: chat.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello" }], + }) + yield* llm.text("world") + + const result = yield* prompt.loop({ sessionID: chat.id }) + expect(result.info.role).toBe("assistant") + const parts = result.parts.filter((p) => p.type === "text") + expect(parts.some((p) => p.type === "text" && p.text === "world")).toBe(true) + expect(yield* llm.hits).toHaveLength(1) + }), + { git: true, config: providerCfg }, ), ) -it.effect("loop continues when finish is tool-calls", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot() - yield* test.reply(...replyToolCalls("first")) - yield* test.reply(...replyStop("second")) - yield* user(chat.id, "hello") +it.live("static loop returns assistant text through local provider", () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const session = yield* Effect.promise(() => + Session.create({ + title: "Prompt provider", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }), + ) - const result = yield* prompt.loop({ sessionID: chat.id }) - expect(yield* test.calls).toBe(2) - expect(result.info.role).toBe("assistant") - if (result.info.role === "assistant") { - expect(result.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true) - expect(result.info.finish).toBe("stop") - } - }), - { git: true, config: cfg }, + yield* Effect.promise(() => + SessionPrompt.prompt({ + sessionID: session.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello" }], + }), + ) + + yield* llm.text("world") + + const result = yield* Effect.promise(() => SessionPrompt.loop({ sessionID: session.id })) + expect(result.info.role).toBe("assistant") + expect(result.parts.some((part) => part.type === "text" && part.text === "world")).toBe(true) + expect(yield* llm.hits).toHaveLength(1) + expect(yield* llm.pending).toBe(0) + }), + { git: true, config: providerCfg }, ), ) -it.effect("failed subtask preserves metadata on error tool state", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot({ title: "Pinned" }) - yield* test.reply( - start(), - toolInputStart("task-1", "task"), - toolCall("task-1", "task", { - description: "inspect bug", - prompt: "look into the cache key path", - subagent_type: "general", - }), - { - type: "finish-step", - finishReason: "tool-calls", - rawFinishReason: "tool_calls", - response: { id: "res", modelId: "test-model", timestamp: new Date() }, - providerMetadata: undefined, - usage: usage(), - }, - { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() }, - ) - yield* test.reply(...replyStop("done")) - const msg = yield* user(chat.id, "hello") - yield* addSubtask(chat.id, msg.id) +it.live("static loop consumes queued replies across turns", () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const session = yield* Effect.promise(() => + Session.create({ + title: "Prompt provider turns", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }), + ) - const result = yield* prompt.loop({ sessionID: chat.id }) - expect(result.info.role).toBe("assistant") - expect(yield* test.calls).toBe(2) + yield* Effect.promise(() => + SessionPrompt.prompt({ + sessionID: session.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello one" }], + }), + ) - const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id))) - const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general") - expect(taskMsg?.info.role).toBe("assistant") - if (!taskMsg || taskMsg.info.role !== "assistant") return + yield* llm.text("world one") - const tool = errorTool(taskMsg.parts) - if (!tool) return + const first = yield* Effect.promise(() => SessionPrompt.loop({ sessionID: session.id })) + expect(first.info.role).toBe("assistant") + expect(first.parts.some((part) => part.type === "text" && part.text === "world one")).toBe(true) - expect(tool.state.error).toContain("Tool execution failed") - expect(tool.state.metadata).toBeDefined() - expect(tool.state.metadata?.sessionId).toBeDefined() - expect(tool.state.metadata?.model).toEqual({ - providerID: ProviderID.make("test"), - modelID: ModelID.make("missing-model"), - }) - }), + yield* Effect.promise(() => + SessionPrompt.prompt({ + sessionID: session.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello two" }], + }), + ) + + yield* llm.text("world two") + + const second = yield* Effect.promise(() => SessionPrompt.loop({ sessionID: session.id })) + expect(second.info.role).toBe("assistant") + expect(second.parts.some((part) => part.type === "text" && part.text === "world two")).toBe(true) + + expect(yield* llm.hits).toHaveLength(2) + expect(yield* llm.pending).toBe(0) + }), + { git: true, config: providerCfg }, + ), +) + +it.live("loop continues when finish is tool-calls", () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const session = yield* sessions.create({ + title: "Pinned", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* prompt.prompt({ + sessionID: session.id, + agent: "build", + noReply: true, + parts: [{ type: "text", text: "hello" }], + }) + yield* llm.tool("first", { value: "first" }) + yield* llm.text("second") + + const result = yield* prompt.loop({ sessionID: session.id }) + expect(yield* llm.calls).toBe(2) + expect(result.info.role).toBe("assistant") + if (result.info.role === "assistant") { + expect(result.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true) + expect(result.info.finish).toBe("stop") + } + }), + { git: true, config: providerCfg }, + ), +) + +it.live("failed subtask preserves metadata on error tool state", () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* llm.tool("task", { + description: "inspect bug", + prompt: "look into the cache key path", + subagent_type: "general", + }) + yield* llm.text("done") + const msg = yield* user(chat.id, "hello") + yield* addSubtask(chat.id, msg.id) + + const result = yield* prompt.loop({ sessionID: chat.id }) + expect(result.info.role).toBe("assistant") + expect(yield* llm.calls).toBe(2) + + const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id))) + const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general") + expect(taskMsg?.info.role).toBe("assistant") + if (!taskMsg || taskMsg.info.role !== "assistant") return + + const tool = errorTool(taskMsg.parts) + if (!tool) return + + expect(tool.state.error).toContain("Tool execution failed") + expect(tool.state.metadata).toBeDefined() + expect(tool.state.metadata?.sessionId).toBeDefined() + expect(tool.state.metadata?.model).toEqual({ + providerID: ProviderID.make("test"), + modelID: ModelID.make("missing-model"), + }) + }), { git: true, - config: { - ...cfg, + config: (url) => ({ + ...providerCfg(url), agent: { general: { model: "test/missing-model", }, }, - }, + }), }, ), ) -it.effect("loop sets status to busy then idle", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const test = yield* TestLLM +it.live( + "loop sets status to busy then idle", + () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { const prompt = yield* SessionPrompt.Service const sessions = yield* Session.Service - const bus = yield* Bus.Service + const status = yield* SessionStatus.Service - yield* test.reply(start(), textStart(), textDelta("t", "ok"), textEnd(), finishStep(), finish()) + yield* llm.hang const chat = yield* sessions.create({}) yield* user(chat.id, "hi") - const types: string[] = [] - const idle = defer() - const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => { - if (evt.properties.sessionID !== chat.id) return - types.push(evt.properties.status.type) - if (evt.properties.status.type === "idle") idle.resolve() - }) - - yield* prompt.loop({ sessionID: chat.id }) - yield* Effect.promise(() => idle.promise) - off() - - expect(types).toContain("busy") - expect(types[types.length - 1]).toBe("idle") + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) + expect((yield* status.get(chat.id)).type).toBe("busy") + yield* prompt.cancel(chat.id) + yield* Fiber.await(fiber) + expect((yield* status.get(chat.id)).type).toBe("idle") }), - { git: true, config: cfg }, - ), + { git: true, config: providerCfg }, + ), + 3_000, ) // Cancel semantics -it.effect( +it.live( "cancel interrupts loop and resolves with an assistant message", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot() - yield* seed(chat.id) - - // Make LLM hang so the loop blocks - yield* test.push((input) => hang(input, start())) + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* seed(chat.id) - // Seed a new user message so the loop enters the LLM path - yield* user(chat.id, "more") + yield* llm.hang - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - // Give the loop time to start - yield* waitMs(200) - yield* prompt.cancel(chat.id) + yield* user(chat.id, "more") - const exit = yield* Fiber.await(fiber) - expect(Exit.isSuccess(exit)).toBe(true) - if (Exit.isSuccess(exit)) { - expect(exit.value.info.role).toBe("assistant") - } - }), - { git: true, config: cfg }, + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) + yield* prompt.cancel(chat.id) + const exit = yield* Fiber.await(fiber) + expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + expect(exit.value.info.role).toBe("assistant") + } + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) -it.effect( +it.live( "cancel records MessageAbortedError on interrupted process", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const ready = defer() - const { test, prompt, chat } = yield* boot() - - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) - yield* user(chat.id, "hello") - - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) - yield* prompt.cancel(chat.id) + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* llm.hang + yield* user(chat.id, "hello") - const exit = yield* Fiber.await(fiber) - expect(Exit.isSuccess(exit)).toBe(true) - if (Exit.isSuccess(exit)) { - const info = exit.value.info - if (info.role === "assistant") { - expect(info.error?.name).toBe("MessageAbortedError") - } + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) + yield* prompt.cancel(chat.id) + const exit = yield* Fiber.await(fiber) + expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + const info = exit.value.info + if (info.role === "assistant") { + expect(info.error?.name).toBe("MessageAbortedError") } - }), - { git: true, config: cfg }, + } + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) -it.effect( +it.live( "cancel finalizes subtask tool state", () => provideTmpdirInstance( @@ -695,45 +644,38 @@ it.effect( 30_000, ) -it.effect( +it.live( "cancel with queued callers resolves all cleanly", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const ready = defer() - const { test, prompt, chat } = yield* boot() - - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) - yield* user(chat.id, "hello") - - const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) - // Queue a second caller - const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* waitMs(50) - - yield* prompt.cancel(chat.id) + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* llm.hang + yield* user(chat.id, "hello") - const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) - expect(Exit.isSuccess(exitA)).toBe(true) - expect(Exit.isSuccess(exitB)).toBe(true) - if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) { - expect(exitA.value.info.id).toBe(exitB.value.info.id) - } - }), - { git: true, config: cfg }, + const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) + const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* Effect.sleep(50) + + yield* prompt.cancel(chat.id) + const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) + expect(Exit.isSuccess(exitA)).toBe(true) + expect(Exit.isSuccess(exitB)).toBe(true) + if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) { + expect(exitA.value.info.id).toBe(exitB.value.info.id) + } + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) // Queue semantics -it.effect("concurrent loop callers get same result", () => +it.live("concurrent loop callers get same result", () => provideTmpdirInstance( (dir) => Effect.gen(function* () { @@ -752,152 +694,128 @@ it.effect("concurrent loop callers get same result", () => ), ) -it.effect("concurrent loop callers all receive same error result", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot() +it.live( + "concurrent loop callers all receive same error result", + () => + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) - // Push a stream that fails — the loop records the error on the assistant message - yield* test.push(Stream.fail(new Error("boom"))) + yield* llm.fail("boom") yield* user(chat.id, "hello") const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], { concurrency: "unbounded", }) - - // Both callers get the same assistant with an error recorded expect(a.info.id).toBe(b.info.id) expect(a.info.role).toBe("assistant") - if (a.info.role === "assistant") { - expect(a.info.error).toBeDefined() - } - if (b.info.role === "assistant") { - expect(b.info.error).toBeDefined() - } }), - { git: true, config: cfg }, - ), + { git: true, config: providerCfg }, + ), + 3_000, ) -it.effect( +it.live( "prompt submitted during an active run is included in the next LLM input", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const ready = defer() - const gate = defer() - const { test, prompt, sessions, chat } = yield* boot() - - yield* test.push((_input) => - stream(start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - Stream.concat( - Stream.fromEffect(Effect.promise(() => gate.promise)).pipe( - Stream.flatMap(() => - stream(textStart("a"), textDelta("a", "first"), textEnd("a"), finishStep(), finish()), - ), - ), - ), - ), - ) - - const a = yield* prompt - .prompt({ - sessionID: chat.id, - agent: "build", - model: ref, - parts: [{ type: "text", text: "first" }], - }) - .pipe(Effect.forkChild) - - yield* Effect.promise(() => ready.promise) + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const gate = defer() + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) - const id = MessageID.ascending() - const b = yield* prompt - .prompt({ - sessionID: chat.id, - messageID: id, - agent: "build", - model: ref, - parts: [{ type: "text", text: "second" }], - }) - .pipe(Effect.forkChild) + yield* llm.hold("first", gate.promise) + yield* llm.text("second") - yield* Effect.promise(async () => { - const end = Date.now() + 5000 - while (Date.now() < end) { - const msgs = await Effect.runPromise(sessions.messages({ sessionID: chat.id })) - if (msgs.some((msg) => msg.info.role === "user" && msg.info.id === id)) return - await new Promise((done) => setTimeout(done, 20)) - } - throw new Error("timed out waiting for second prompt to save") + const a = yield* prompt + .prompt({ + sessionID: chat.id, + agent: "build", + model: ref, + parts: [{ type: "text", text: "first" }], }) + .pipe(Effect.forkChild) + + yield* llm.wait(1) + + const id = MessageID.ascending() + const b = yield* prompt + .prompt({ + sessionID: chat.id, + messageID: id, + agent: "build", + model: ref, + parts: [{ type: "text", text: "second" }], + }) + .pipe(Effect.forkChild) + + yield* Effect.promise(async () => { + const end = Date.now() + 5000 + while (Date.now() < end) { + const msgs = await Effect.runPromise(sessions.messages({ sessionID: chat.id })) + if (msgs.some((msg) => msg.info.role === "user" && msg.info.id === id)) return + await new Promise((done) => setTimeout(done, 20)) + } + throw new Error("timed out waiting for second prompt to save") + }) - yield* test.reply(...replyStop("second")) - gate.resolve() - - const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) - expect(Exit.isSuccess(ea)).toBe(true) - expect(Exit.isSuccess(eb)).toBe(true) - expect(yield* test.calls).toBe(2) - - const msgs = yield* sessions.messages({ sessionID: chat.id }) - const assistants = msgs.filter((msg) => msg.info.role === "assistant") - expect(assistants).toHaveLength(2) - const last = assistants.at(-1) - if (!last || last.info.role !== "assistant") throw new Error("expected second assistant") - expect(last.info.parentID).toBe(id) - expect(last.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true) - - const inputs = yield* test.inputs - expect(inputs).toHaveLength(2) - expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("second") - }), - { git: true, config: cfg }, + gate.resolve() + + const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) + expect(Exit.isSuccess(ea)).toBe(true) + expect(Exit.isSuccess(eb)).toBe(true) + expect(yield* llm.calls).toBe(2) + + const msgs = yield* sessions.messages({ sessionID: chat.id }) + const assistants = msgs.filter((msg) => msg.info.role === "assistant") + expect(assistants).toHaveLength(2) + const last = assistants.at(-1) + if (!last || last.info.role !== "assistant") throw new Error("expected second assistant") + expect(last.info.parentID).toBe(id) + expect(last.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true) + + const inputs = yield* llm.inputs + expect(inputs).toHaveLength(2) + expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("second") + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) -it.effect( +it.live( "assertNotBusy throws BusyError when loop running", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const ready = defer() - const test = yield* TestLLM - const prompt = yield* SessionPrompt.Service - const sessions = yield* Session.Service - - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + yield* llm.hang - const chat = yield* sessions.create({}) - yield* user(chat.id, "hi") + const chat = yield* sessions.create({}) + yield* user(chat.id, "hi") - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) - const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit) - expect(Exit.isFailure(exit)).toBe(true) - if (Exit.isFailure(exit)) { - expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) - } + const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) + } - yield* prompt.cancel(chat.id) - yield* Fiber.await(fiber) - }), - { git: true, config: cfg }, + yield* prompt.cancel(chat.id) + yield* Fiber.await(fiber) + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) -it.effect("assertNotBusy succeeds when idle", () => +it.live("assertNotBusy succeeds when idle", () => provideTmpdirInstance( (dir) => Effect.gen(function* () { @@ -914,37 +832,32 @@ it.effect("assertNotBusy succeeds when idle", () => // Shell semantics -it.effect( +it.live( "shell rejects with BusyError when loop running", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const ready = defer() - const { test, prompt, chat } = yield* boot() - - yield* test.push((input) => - hang(input, start()).pipe( - Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)), - ), - ) - yield* user(chat.id, "hi") + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ title: "Pinned" }) + yield* llm.hang + yield* user(chat.id, "hi") - const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* Effect.promise(() => ready.promise) + const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* llm.wait(1) - const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit) - expect(Exit.isFailure(exit)).toBe(true) - if (Exit.isFailure(exit)) { - expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) - } + const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) { + expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError) + } - yield* prompt.cancel(chat.id) - yield* Fiber.await(fiber) - }), - { git: true, config: cfg }, + yield* prompt.cancel(chat.id) + yield* Fiber.await(fiber) + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) unix("shell captures stdout and stderr in completed tool output", () => @@ -1006,74 +919,82 @@ unix( 30_000, ) -unix( +it.live( "loop waits while shell runs and starts after shell exits", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot() - yield* test.reply(...replyStop("after-shell")) + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + title: "Pinned", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* llm.text("after-shell") - const sh = yield* prompt - .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" }) - .pipe(Effect.forkChild) - yield* waitMs(50) + const sh = yield* prompt + .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" }) + .pipe(Effect.forkChild) + yield* Effect.sleep(50) - const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* waitMs(50) + const loop = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* Effect.sleep(50) - expect(yield* test.calls).toBe(0) + expect(yield* llm.calls).toBe(0) - yield* Fiber.await(sh) - const exit = yield* Fiber.await(run) + yield* Fiber.await(sh) + const exit = yield* Fiber.await(loop) - expect(Exit.isSuccess(exit)).toBe(true) - if (Exit.isSuccess(exit)) { - expect(exit.value.info.role).toBe("assistant") - expect(exit.value.parts.some((part) => part.type === "text" && part.text === "after-shell")).toBe(true) - } - expect(yield* test.calls).toBe(1) - }), - { git: true, config: cfg }, + expect(Exit.isSuccess(exit)).toBe(true) + if (Exit.isSuccess(exit)) { + expect(exit.value.info.role).toBe("assistant") + expect(exit.value.parts.some((part) => part.type === "text" && part.text === "after-shell")).toBe(true) + } + expect(yield* llm.calls).toBe(1) + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) -unix( +it.live( "shell completion resumes queued loop callers", () => - provideTmpdirInstance( - (dir) => - Effect.gen(function* () { - const { test, prompt, chat } = yield* boot() - yield* test.reply(...replyStop("done")) + provideTmpdirServer( + Effect.fnUntraced(function* ({ llm }) { + const prompt = yield* SessionPrompt.Service + const sessions = yield* Session.Service + const chat = yield* sessions.create({ + title: "Pinned", + permission: [{ permission: "*", pattern: "*", action: "allow" }], + }) + yield* llm.text("done") - const sh = yield* prompt - .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" }) - .pipe(Effect.forkChild) - yield* waitMs(50) + const sh = yield* prompt + .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" }) + .pipe(Effect.forkChild) + yield* Effect.sleep(50) - const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* waitMs(50) + const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* Effect.sleep(50) - expect(yield* test.calls).toBe(0) + expect(yield* llm.calls).toBe(0) - yield* Fiber.await(sh) - const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) + yield* Fiber.await(sh) + const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)]) - expect(Exit.isSuccess(ea)).toBe(true) - expect(Exit.isSuccess(eb)).toBe(true) - if (Exit.isSuccess(ea) && Exit.isSuccess(eb)) { - expect(ea.value.info.id).toBe(eb.value.info.id) - expect(ea.value.info.role).toBe("assistant") - } - expect(yield* test.calls).toBe(1) - }), - { git: true, config: cfg }, + expect(Exit.isSuccess(ea)).toBe(true) + expect(Exit.isSuccess(eb)).toBe(true) + if (Exit.isSuccess(ea) && Exit.isSuccess(eb)) { + expect(ea.value.info.id).toBe(eb.value.info.id) + expect(ea.value.info.role).toBe("assistant") + } + expect(yield* llm.calls).toBe(1) + }), + { git: true, config: providerCfg }, ), - 30_000, + 3_000, ) unix( @@ -1088,7 +1009,7 @@ unix( const sh = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" }) .pipe(Effect.forkChild) - yield* waitMs(50) + yield* Effect.sleep(50) yield* prompt.cancel(chat.id) @@ -1125,7 +1046,7 @@ unix( const sh = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "trap '' TERM; sleep 30" }) .pipe(Effect.forkChild) - yield* waitMs(50) + yield* Effect.sleep(50) yield* prompt.cancel(chat.id) @@ -1156,14 +1077,14 @@ unix( const sh = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" }) .pipe(Effect.forkChild) - yield* waitMs(50) + yield* Effect.sleep(50) - const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) - yield* waitMs(50) + const loop = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild) + yield* Effect.sleep(50) yield* prompt.cancel(chat.id) - const exit = yield* Fiber.await(run) + const exit = yield* Fiber.await(loop) expect(Exit.isSuccess(exit)).toBe(true) yield* Fiber.await(sh) @@ -1185,7 +1106,7 @@ unix( const a = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" }) .pipe(Effect.forkChild) - yield* waitMs(50) + yield* Effect.sleep(50) const exit = yield* prompt .shell({ sessionID: chat.id, agent: "build", command: "echo hi" }) diff --git a/packages/opencode/test/tool/truncation.test.ts b/packages/opencode/test/tool/truncation.test.ts index dba083c5123e..9ec5b784005c 100644 --- a/packages/opencode/test/tool/truncation.test.ts +++ b/packages/opencode/test/tool/truncation.test.ts @@ -140,7 +140,7 @@ describe("Truncate", () => { const DAY_MS = 24 * 60 * 60 * 1000 const it = testEffect(Layer.mergeAll(TruncateSvc.defaultLayer, NodeFileSystem.layer)) - it.effect("deletes files older than 7 days and preserves recent files", () => + it.live("deletes files older than 7 days and preserves recent files", () => Effect.gen(function* () { const fs = yield* FileSystem.FileSystem diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index 318b8907a91d..290c6fd5ec81 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -4,20 +4,6 @@ export type ClientOptions = { baseUrl: `${string}://${string}` | (string & {}) } -export type EventInstallationUpdated = { - type: "installation.updated" - properties: { - version: string - } -} - -export type EventInstallationUpdateAvailable = { - type: "installation.update-available" - properties: { - version: string - } -} - export type Project = { id: string worktree: string @@ -47,6 +33,20 @@ export type EventProjectUpdated = { properties: Project } +export type EventInstallationUpdated = { + type: "installation.updated" + properties: { + version: string + } +} + +export type EventInstallationUpdateAvailable = { + type: "installation.update-available" + properties: { + version: string + } +} + export type EventServerInstanceDisposed = { type: "server.instance.disposed" properties: { @@ -964,9 +964,9 @@ export type EventSessionDeleted = { } export type Event = + | EventProjectUpdated | EventInstallationUpdated | EventInstallationUpdateAvailable - | EventProjectUpdated | EventServerInstanceDisposed | EventServerConnected | EventGlobalDisposed