? z.infer : never
@@ -48,7 +50,7 @@ export namespace Tool {
export function define(
id: string,
- init: Info["init"] | Awaited["init"]>>,
+ init: Info["init"] | Def,
): Info {
return {
id,
diff --git a/packages/opencode/test/effect/runner.test.ts b/packages/opencode/test/effect/runner.test.ts
new file mode 100644
index 000000000000..5d3488849c59
--- /dev/null
+++ b/packages/opencode/test/effect/runner.test.ts
@@ -0,0 +1,523 @@
+import { describe, expect, test } from "bun:test"
+import { Deferred, Effect, Exit, Fiber, Ref, Scope } from "effect"
+import { Runner } from "../../src/effect/runner"
+import { it } from "../lib/effect"
+
+describe("Runner", () => {
+ // --- ensureRunning semantics ---
+
+ it.effect(
+ "ensureRunning starts work and returns result",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const result = yield* runner.ensureRunning(Effect.succeed("hello"))
+ expect(result).toBe("hello")
+ expect(runner.state._tag).toBe("Idle")
+ expect(runner.busy).toBe(false)
+ }),
+ )
+
+ it.effect(
+ "ensureRunning propagates work failures",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const exit = yield* runner.ensureRunning(Effect.fail("boom")).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+ expect(runner.state._tag).toBe("Idle")
+ }),
+ )
+
+ it.effect(
+ "concurrent callers share the same run",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const calls = yield* Ref.make(0)
+ const work = Effect.gen(function* () {
+ yield* Ref.update(calls, (n) => n + 1)
+ yield* Effect.sleep("10 millis")
+ return "shared"
+ })
+
+ const [a, b] = yield* Effect.all([runner.ensureRunning(work), runner.ensureRunning(work)], {
+ concurrency: "unbounded",
+ })
+
+ expect(a).toBe("shared")
+ expect(b).toBe("shared")
+ expect(yield* Ref.get(calls)).toBe(1)
+ }),
+ )
+
+ it.effect(
+ "concurrent callers all receive same error",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const work = Effect.gen(function* () {
+ yield* Effect.sleep("10 millis")
+ return yield* Effect.fail("boom")
+ })
+
+ const [a, b] = yield* Effect.all(
+ [runner.ensureRunning(work).pipe(Effect.exit), runner.ensureRunning(work).pipe(Effect.exit)],
+ { concurrency: "unbounded" },
+ )
+
+ expect(Exit.isFailure(a)).toBe(true)
+ expect(Exit.isFailure(b)).toBe(true)
+ }),
+ )
+
+ it.effect(
+ "ensureRunning can be called again after previous run completes",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ expect(yield* runner.ensureRunning(Effect.succeed("first"))).toBe("first")
+ expect(yield* runner.ensureRunning(Effect.succeed("second"))).toBe("second")
+ }),
+ )
+
+ it.effect(
+ "second ensureRunning ignores new work if already running",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const ran = yield* Ref.make([])
+
+ const first = Effect.gen(function* () {
+ yield* Ref.update(ran, (a) => [...a, "first"])
+ yield* Effect.sleep("50 millis")
+ return "first-result"
+ })
+ const second = Effect.gen(function* () {
+ yield* Ref.update(ran, (a) => [...a, "second"])
+ return "second-result"
+ })
+
+ const [a, b] = yield* Effect.all([runner.ensureRunning(first), runner.ensureRunning(second)], {
+ concurrency: "unbounded",
+ })
+
+ expect(a).toBe("first-result")
+ expect(b).toBe("first-result")
+ expect(yield* Ref.get(ran)).toEqual(["first"])
+ }),
+ )
+
+ // --- cancel semantics ---
+
+ it.effect(
+ "cancel interrupts running work",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.busy).toBe(true)
+ expect(runner.state._tag).toBe("Running")
+
+ yield* runner.cancel
+ expect(runner.busy).toBe(false)
+
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isFailure(exit)).toBe(true)
+ }),
+ )
+
+ it.effect(
+ "cancel on idle is a no-op",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ yield* runner.cancel
+ expect(runner.busy).toBe(false)
+ }),
+ )
+
+ it.effect(
+ "cancel with onInterrupt resolves callers gracefully",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s, { onInterrupt: Effect.succeed("fallback") })
+ const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ yield* runner.cancel
+
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) expect(exit.value).toBe("fallback")
+ }),
+ )
+
+ it.effect(
+ "cancel with queued callers resolves all",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s, { onInterrupt: Effect.succeed("fallback") })
+
+ const a = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ const b = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ yield* runner.cancel
+
+ const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+ expect(Exit.isSuccess(exitA)).toBe(true)
+ expect(Exit.isSuccess(exitB)).toBe(true)
+ if (Exit.isSuccess(exitA)) expect(exitA.value).toBe("fallback")
+ if (Exit.isSuccess(exitB)) expect(exitB.value).toBe("fallback")
+ }),
+ )
+
+ it.effect(
+ "work can be started after cancel",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ yield* runner.cancel
+ yield* Fiber.await(fiber)
+
+ const result = yield* runner.ensureRunning(Effect.succeed("after-cancel"))
+ expect(result).toBe("after-cancel")
+ }),
+ )
+
+ test("cancel does not deadlock when replacement work starts before interrupted run exits", async () => {
+ function defer() {
+ let resolve!: () => void
+ const promise = new Promise((done) => {
+ resolve = done
+ })
+ return { promise, resolve }
+ }
+
+ function fail(ms: number, msg: string) {
+ return new Promise((_, reject) => {
+ setTimeout(() => reject(new Error(msg)), ms)
+ })
+ }
+
+ const s = await Effect.runPromise(Scope.make())
+ const hit = defer()
+ const hold = defer()
+ const done = defer()
+ try {
+ const runner = Runner.make(s)
+ const first = Effect.never.pipe(
+ Effect.onInterrupt(() => Effect.sync(() => hit.resolve())),
+ Effect.ensuring(Effect.promise(() => hold.promise)),
+ Effect.as("first"),
+ )
+
+ const a = Effect.runPromiseExit(runner.ensureRunning(first))
+ await Bun.sleep(10)
+
+ const stop = Effect.runPromise(runner.cancel)
+ await Promise.race([hit.promise, fail(250, "cancel did not interrupt running work")])
+
+ const b = Effect.runPromise(runner.ensureRunning(Effect.promise(() => done.promise).pipe(Effect.as("second"))))
+ expect(runner.busy).toBe(true)
+
+ hold.resolve()
+ await Promise.race([stop, fail(250, "cancel deadlocked while replacement run was active")])
+
+ expect(runner.busy).toBe(true)
+ done.resolve()
+ expect(await b).toBe("second")
+ expect(runner.busy).toBe(false)
+
+ const exit = await a
+ expect(Exit.isFailure(exit)).toBe(true)
+ } finally {
+ hold.resolve()
+ done.resolve()
+ await Promise.race([Effect.runPromise(Scope.close(s, Exit.void)), fail(1000, "runner scope did not close")])
+ }
+ })
+
+ // --- shell semantics ---
+
+ it.effect(
+ "shell runs exclusively",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const result = yield* runner.startShell((_signal) => Effect.succeed("shell-done"))
+ expect(result).toBe("shell-done")
+ expect(runner.busy).toBe(false)
+ }),
+ )
+
+ it.effect(
+ "shell rejects when run is active",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const exit = yield* runner.startShell((_s) => Effect.succeed("nope")).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+
+ yield* runner.cancel
+ yield* Fiber.await(fiber)
+ }),
+ )
+
+ it.effect(
+ "shell rejects when another shell is running",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const gate = yield* Deferred.make()
+
+ const sh = yield* runner
+ .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("first")))
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+
+ yield* Deferred.succeed(gate, undefined)
+ yield* Fiber.await(sh)
+ }),
+ )
+
+ it.effect(
+ "shell rejects via busy callback and cancel still stops the first shell",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s, {
+ busy: () => {
+ throw new Error("busy")
+ },
+ })
+
+ const sh = yield* runner
+ .startShell((signal) =>
+ Effect.promise(
+ () =>
+ new Promise((resolve) => {
+ signal.addEventListener("abort", () => resolve("aborted"), { once: true })
+ }),
+ ),
+ )
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+
+ yield* runner.cancel
+ const done = yield* Fiber.await(sh)
+ expect(Exit.isSuccess(done)).toBe(true)
+ }),
+ )
+
+ it.effect(
+ "cancel interrupts shell that ignores abort signal",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const gate = yield* Deferred.make()
+
+ const sh = yield* runner
+ .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ignored")))
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const stop = yield* runner.cancel.pipe(Effect.forkChild)
+ const stopExit = yield* Fiber.await(stop).pipe(Effect.timeout("250 millis"))
+ expect(Exit.isSuccess(stopExit)).toBe(true)
+ expect(runner.busy).toBe(false)
+
+ const shellExit = yield* Fiber.await(sh)
+ expect(Exit.isFailure(shellExit)).toBe(true)
+
+ yield* Deferred.succeed(gate, undefined).pipe(Effect.ignore)
+ }),
+ )
+
+ // --- shell→run handoff ---
+
+ it.effect(
+ "ensureRunning queues behind shell then runs after",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const gate = yield* Deferred.make()
+
+ const sh = yield* runner
+ .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell-result")))
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.state._tag).toBe("Shell")
+
+ const run = yield* runner.ensureRunning(Effect.succeed("run-result")).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.state._tag).toBe("ShellThenRun")
+
+ yield* Deferred.succeed(gate, undefined)
+ yield* Fiber.await(sh)
+
+ const exit = yield* Fiber.await(run)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) expect(exit.value).toBe("run-result")
+ expect(runner.state._tag).toBe("Idle")
+ }),
+ )
+
+ it.effect(
+ "multiple ensureRunning callers share the queued run behind shell",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const calls = yield* Ref.make(0)
+ const gate = yield* Deferred.make()
+
+ const sh = yield* runner
+ .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell")))
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const work = Effect.gen(function* () {
+ yield* Ref.update(calls, (n) => n + 1)
+ return "run"
+ })
+ const a = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
+ const b = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ yield* Deferred.succeed(gate, undefined)
+ yield* Fiber.await(sh)
+
+ const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+ expect(Exit.isSuccess(exitA)).toBe(true)
+ expect(Exit.isSuccess(exitB)).toBe(true)
+ expect(yield* Ref.get(calls)).toBe(1)
+ }),
+ )
+
+ it.effect(
+ "cancel during shell_then_run cancels both",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const gate = yield* Deferred.make()
+
+ const sh = yield* runner
+ .startShell((signal) =>
+ Effect.promise(
+ () =>
+ new Promise((resolve) => {
+ signal.addEventListener("abort", () => resolve("aborted"), { once: true })
+ }),
+ ),
+ )
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.state._tag).toBe("ShellThenRun")
+
+ yield* runner.cancel
+ expect(runner.busy).toBe(false)
+
+ yield* Fiber.await(sh)
+ const exit = yield* Fiber.await(run)
+ expect(Exit.isFailure(exit)).toBe(true)
+ }),
+ )
+
+ // --- lifecycle callbacks ---
+
+ it.effect(
+ "onIdle fires when returning to idle from running",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const count = yield* Ref.make(0)
+ const runner = Runner.make(s, {
+ onIdle: Ref.update(count, (n) => n + 1),
+ })
+ yield* runner.ensureRunning(Effect.succeed("ok"))
+ expect(yield* Ref.get(count)).toBe(1)
+ }),
+ )
+
+ it.effect(
+ "onIdle fires on cancel",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const count = yield* Ref.make(0)
+ const runner = Runner.make(s, {
+ onIdle: Ref.update(count, (n) => n + 1),
+ })
+ const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ yield* runner.cancel
+ yield* Fiber.await(fiber)
+ expect(yield* Ref.get(count)).toBeGreaterThanOrEqual(1)
+ }),
+ )
+
+ it.effect(
+ "onBusy fires when shell starts",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const count = yield* Ref.make(0)
+ const runner = Runner.make(s, {
+ onBusy: Ref.update(count, (n) => n + 1),
+ })
+ yield* runner.startShell((_signal) => Effect.succeed("done"))
+ expect(yield* Ref.get(count)).toBe(1)
+ }),
+ )
+
+ // --- busy flag ---
+
+ it.effect(
+ "busy is true during run",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const gate = yield* Deferred.make()
+
+ const fiber = yield* runner.ensureRunning(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.busy).toBe(true)
+
+ yield* Deferred.succeed(gate, undefined)
+ yield* Fiber.await(fiber)
+ expect(runner.busy).toBe(false)
+ }),
+ )
+
+ it.effect(
+ "busy is true during shell",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make(s)
+ const gate = yield* Deferred.make()
+
+ const fiber = yield* runner
+ .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ok")))
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.busy).toBe(true)
+
+ yield* Deferred.succeed(gate, undefined)
+ yield* Fiber.await(fiber)
+ expect(runner.busy).toBe(false)
+ }),
+ )
+})
diff --git a/packages/opencode/test/server/session-list.test.ts b/packages/opencode/test/server/session-list.test.ts
index 675a89011f96..933b5b5b5a97 100644
--- a/packages/opencode/test/server/session-list.test.ts
+++ b/packages/opencode/test/server/session-list.test.ts
@@ -1,26 +1,30 @@
-import { describe, expect, test } from "bun:test"
-import path from "path"
+import { afterEach, describe, expect, test } from "bun:test"
import { Instance } from "../../src/project/instance"
import { Session } from "../../src/session"
import { Log } from "../../src/util/log"
+import { tmpdir } from "../fixture/fixture"
-const projectRoot = path.join(__dirname, "../..")
Log.init({ print: false })
+afterEach(async () => {
+ await Instance.disposeAll()
+})
+
describe("Session.list", () => {
test("filters by directory", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
const first = await Session.create({})
- const otherDir = path.join(projectRoot, "..", "__session_list_other")
+ await using other = await tmpdir({ git: true })
const second = await Instance.provide({
- directory: otherDir,
+ directory: other.path,
fn: async () => Session.create({}),
})
- const sessions = [...Session.list({ directory: projectRoot })]
+ const sessions = [...Session.list({ directory: tmp.path })]
const ids = sessions.map((s) => s.id)
expect(ids).toContain(first.id)
@@ -30,8 +34,9 @@ describe("Session.list", () => {
})
test("filters root sessions", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
const root = await Session.create({ title: "root-session" })
const child = await Session.create({ title: "child-session", parentID: root.id })
@@ -46,8 +51,9 @@ describe("Session.list", () => {
})
test("filters by start time", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
const session = await Session.create({ title: "new-session" })
const futureStart = Date.now() + 86400000
@@ -59,8 +65,9 @@ describe("Session.list", () => {
})
test("filters by search term", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
await Session.create({ title: "unique-search-term-abc" })
await Session.create({ title: "other-session-xyz" })
@@ -75,8 +82,9 @@ describe("Session.list", () => {
})
test("respects limit parameter", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
await Session.create({ title: "session-1" })
await Session.create({ title: "session-2" })
diff --git a/packages/opencode/test/server/session-messages.test.ts b/packages/opencode/test/server/session-messages.test.ts
index 91e0fd92634c..d7e44cbecc36 100644
--- a/packages/opencode/test/server/session-messages.test.ts
+++ b/packages/opencode/test/server/session-messages.test.ts
@@ -1,15 +1,18 @@
-import { describe, expect, test } from "bun:test"
-import path from "path"
+import { afterEach, describe, expect, test } from "bun:test"
import { Instance } from "../../src/project/instance"
import { Server } from "../../src/server/server"
import { Session } from "../../src/session"
import { MessageV2 } from "../../src/session/message-v2"
import { MessageID, PartID, type SessionID } from "../../src/session/schema"
import { Log } from "../../src/util/log"
+import { tmpdir } from "../fixture/fixture"
-const root = path.join(__dirname, "../..")
Log.init({ print: false })
+afterEach(async () => {
+ await Instance.disposeAll()
+})
+
async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) {
const ids = [] as MessageID[]
for (let i = 0; i < count; i++) {
@@ -38,8 +41,9 @@ async function fill(sessionID: SessionID, count: number, time = (i: number) => D
describe("session messages endpoint", () => {
test("returns cursor headers for older pages", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: root,
+ directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 5)
@@ -64,8 +68,9 @@ describe("session messages endpoint", () => {
})
test("keeps full-history responses when limit is omitted", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: root,
+ directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 3)
@@ -82,8 +87,9 @@ describe("session messages endpoint", () => {
})
test("rejects invalid cursors and missing sessions", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: root,
+ directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const app = Server.Default()
@@ -100,8 +106,9 @@ describe("session messages endpoint", () => {
})
test("does not truncate large legacy limit requests", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: root,
+ directory: tmp.path,
fn: async () => {
const session = await Session.create({})
await fill(session.id, 520)
@@ -120,7 +127,7 @@ describe("session messages endpoint", () => {
describe("session.prompt_async error handling", () => {
test("prompt_async route has error handler for detached prompt call", async () => {
- const src = await Bun.file(path.join(import.meta.dir, "../../src/server/routes/session.ts")).text()
+ const src = await Bun.file(new URL("../../src/server/routes/session.ts", import.meta.url)).text()
const start = src.indexOf('"/:sessionID/prompt_async"')
const end = src.indexOf('"/:sessionID/command"', start)
expect(start).toBeGreaterThan(-1)
diff --git a/packages/opencode/test/server/session-select.test.ts b/packages/opencode/test/server/session-select.test.ts
index a336f8133c87..345b4314675b 100644
--- a/packages/opencode/test/server/session-select.test.ts
+++ b/packages/opencode/test/server/session-select.test.ts
@@ -1,17 +1,21 @@
-import { describe, expect, test } from "bun:test"
-import path from "path"
+import { afterEach, describe, expect, test } from "bun:test"
import { Session } from "../../src/session"
import { Log } from "../../src/util/log"
import { Instance } from "../../src/project/instance"
import { Server } from "../../src/server/server"
+import { tmpdir } from "../fixture/fixture"
-const projectRoot = path.join(__dirname, "../..")
Log.init({ print: false })
+afterEach(async () => {
+ await Instance.disposeAll()
+})
+
describe("tui.selectSession endpoint", () => {
test("should return 200 when called with valid session", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
// #given
const session = await Session.create({})
@@ -35,8 +39,9 @@ describe("tui.selectSession endpoint", () => {
})
test("should return 404 when session does not exist", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
// #given
const nonExistentSessionID = "ses_nonexistent123"
@@ -56,8 +61,9 @@ describe("tui.selectSession endpoint", () => {
})
test("should return 400 when session ID format is invalid", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
// #given
const invalidSessionID = "invalid_session_id"
diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts
index 8f29b7788041..c08fef5633f7 100644
--- a/packages/opencode/test/session/compaction.test.ts
+++ b/packages/opencode/test/session/compaction.test.ts
@@ -129,7 +129,7 @@ async function tool(sessionID: SessionID, messageID: MessageID, tool: string, ou
}
function fake(
- input: Parameters<(typeof SessionProcessorModule.SessionProcessor)["create"]>[0],
+ input: Parameters[0],
result: "continue" | "compact",
) {
const msg = input.assistantMessage
@@ -540,7 +540,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: new AbortController().signal,
auto: false,
}),
),
@@ -580,7 +579,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: new AbortController().signal,
auto: false,
}),
),
@@ -621,7 +619,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: new AbortController().signal,
auto: true,
}),
),
@@ -675,7 +672,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: new AbortController().signal,
auto: true,
overflow: true,
}),
@@ -717,7 +713,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: new AbortController().signal,
auto: true,
overflow: true,
}),
@@ -792,7 +787,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: abort.signal,
auto: false,
}),
),
@@ -858,7 +852,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: abort.signal,
auto: false,
}),
),
@@ -892,6 +885,91 @@ describe("session.compaction.process", () => {
},
})
})
+
+ test("does not allow tool calls while generating the summary", async () => {
+ const stub = llm()
+ stub.push(
+ Stream.make(
+ { type: "start" } satisfies LLM.Event,
+ { type: "tool-input-start", id: "call-1", toolName: "_noop" } satisfies LLM.Event,
+ { type: "tool-call", toolCallId: "call-1", toolName: "_noop", input: {} } satisfies LLM.Event,
+ {
+ type: "finish-step",
+ finishReason: "tool-calls",
+ rawFinishReason: "tool_calls",
+ response: { id: "res", modelId: "test-model", timestamp: new Date() },
+ providerMetadata: undefined,
+ usage: {
+ inputTokens: 1,
+ outputTokens: 1,
+ totalTokens: 2,
+ inputTokenDetails: {
+ noCacheTokens: undefined,
+ cacheReadTokens: undefined,
+ cacheWriteTokens: undefined,
+ },
+ outputTokenDetails: {
+ textTokens: undefined,
+ reasoningTokens: undefined,
+ },
+ },
+ } satisfies LLM.Event,
+ {
+ type: "finish",
+ finishReason: "tool-calls",
+ rawFinishReason: "tool_calls",
+ totalUsage: {
+ inputTokens: 1,
+ outputTokens: 1,
+ totalTokens: 2,
+ inputTokenDetails: {
+ noCacheTokens: undefined,
+ cacheReadTokens: undefined,
+ cacheWriteTokens: undefined,
+ },
+ outputTokenDetails: {
+ textTokens: undefined,
+ reasoningTokens: undefined,
+ },
+ },
+ } satisfies LLM.Event,
+ ),
+ )
+
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
+
+ const session = await Session.create({})
+ const msg = await user(session.id, "hello")
+ const rt = liveRuntime(stub.layer)
+ try {
+ const msgs = await Session.messages({ sessionID: session.id })
+ await rt.runPromise(
+ SessionCompaction.Service.use((svc) =>
+ svc.process({
+ parentID: msg.id,
+ messages: msgs,
+ sessionID: session.id,
+ auto: false,
+ }),
+ ),
+ )
+
+ const summary = (await Session.messages({ sessionID: session.id })).find(
+ (item) => item.info.role === "assistant" && item.info.summary,
+ )
+
+ expect(summary?.info.role).toBe("assistant")
+ expect(summary?.parts.some((part) => part.type === "tool")).toBe(false)
+ } finally {
+ await rt.dispose()
+ }
+ },
+ })
+ })
})
describe("util.token.estimate", () => {
diff --git a/packages/opencode/test/session/llm.test.ts b/packages/opencode/test/session/llm.test.ts
index 8de7d2723a9b..bb81aa681c9f 100644
--- a/packages/opencode/test/session/llm.test.ts
+++ b/packages/opencode/test/session/llm.test.ts
@@ -1,7 +1,9 @@
import { afterAll, beforeAll, beforeEach, describe, expect, test } from "bun:test"
import path from "path"
import { tool, type ModelMessage } from "ai"
+import { Cause, Exit, Stream } from "effect"
import z from "zod"
+import { makeRuntime } from "../../src/effect/run-service"
import { LLM } from "../../src/session/llm"
import { Instance } from "../../src/project/instance"
import { Provider } from "../../src/provider/provider"
@@ -109,7 +111,11 @@ type Capture = {
const state = {
server: null as ReturnType | null,
- queue: [] as Array<{ path: string; response: Response; resolve: (value: Capture) => void }>,
+ queue: [] as Array<{
+ path: string
+ response: Response | ((req: Request, capture: Capture) => Response)
+ resolve: (value: Capture) => void
+ }>,
}
function deferred() {
@@ -126,6 +132,58 @@ function waitRequest(pathname: string, response: Response) {
return pending.promise
}
+function timeout(ms: number) {
+ return new Promise((_, reject) => {
+ setTimeout(() => reject(new Error(`timed out after ${ms}ms`)), ms)
+ })
+}
+
+function waitStreamingRequest(pathname: string) {
+ const request = deferred()
+ const requestAborted = deferred()
+ const responseCanceled = deferred()
+ const encoder = new TextEncoder()
+
+ state.queue.push({
+ path: pathname,
+ resolve: request.resolve,
+ response(req: Request) {
+ req.signal.addEventListener("abort", () => requestAborted.resolve(), { once: true })
+
+ return new Response(
+ new ReadableStream({
+ start(controller) {
+ controller.enqueue(
+ encoder.encode(
+ [
+ `data: ${JSON.stringify({
+ id: "chatcmpl-abort",
+ object: "chat.completion.chunk",
+ choices: [{ delta: { role: "assistant" } }],
+ })}`,
+ ].join("\n\n") + "\n\n",
+ ),
+ )
+ },
+ cancel() {
+ responseCanceled.resolve()
+ },
+ }),
+ {
+ status: 200,
+ headers: { "Content-Type": "text/event-stream" },
+ },
+ )
+ },
+ })
+
+ return {
+ request: request.promise,
+ requestAborted: requestAborted.promise,
+ responseCanceled: responseCanceled.promise,
+ }
+}
+
beforeAll(() => {
state.server = Bun.serve({
port: 0,
@@ -143,7 +201,9 @@ beforeAll(() => {
return new Response("not found", { status: 404 })
}
- return next.response
+ return typeof next.response === "function"
+ ? next.response(req, { url, headers: req.headers, body })
+ : next.response
},
})
})
@@ -325,6 +385,162 @@ describe("session.llm.stream", () => {
})
})
+ test("raw stream abort signal cancels provider response body promptly", async () => {
+ const server = state.server
+ if (!server) throw new Error("Server not initialized")
+
+ const providerID = "alibaba"
+ const modelID = "qwen-plus"
+ const fixture = await loadFixture(providerID, modelID)
+ const model = fixture.model
+ const pending = waitStreamingRequest("/chat/completions")
+
+ await using tmp = await tmpdir({
+ init: async (dir) => {
+ await Bun.write(
+ path.join(dir, "opencode.json"),
+ JSON.stringify({
+ $schema: "https://opencode.ai/config.json",
+ enabled_providers: [providerID],
+ provider: {
+ [providerID]: {
+ options: {
+ apiKey: "test-key",
+ baseURL: `${server.url.origin}/v1`,
+ },
+ },
+ },
+ }),
+ )
+ },
+ })
+
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const resolved = await Provider.getModel(ProviderID.make(providerID), ModelID.make(model.id))
+ const sessionID = SessionID.make("session-test-raw-abort")
+ const agent = {
+ name: "test",
+ mode: "primary",
+ options: {},
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ } satisfies Agent.Info
+ const user = {
+ id: MessageID.make("user-raw-abort"),
+ sessionID,
+ role: "user",
+ time: { created: Date.now() },
+ agent: agent.name,
+ model: { providerID: ProviderID.make(providerID), modelID: resolved.id },
+ } satisfies MessageV2.User
+
+ const ctrl = new AbortController()
+ const result = await LLM.stream({
+ user,
+ sessionID,
+ model: resolved,
+ agent,
+ system: ["You are a helpful assistant."],
+ abort: ctrl.signal,
+ messages: [{ role: "user", content: "Hello" }],
+ tools: {},
+ })
+
+ const iter = result.fullStream[Symbol.asyncIterator]()
+ await pending.request
+ await iter.next()
+ ctrl.abort()
+
+ await Promise.race([pending.responseCanceled, timeout(500)])
+ await Promise.race([pending.requestAborted, timeout(500)]).catch(() => undefined)
+ await iter.return?.()
+ },
+ })
+ })
+
+ test("service stream cancellation cancels provider response body promptly", async () => {
+ const server = state.server
+ if (!server) throw new Error("Server not initialized")
+
+ const providerID = "alibaba"
+ const modelID = "qwen-plus"
+ const fixture = await loadFixture(providerID, modelID)
+ const model = fixture.model
+ const pending = waitStreamingRequest("/chat/completions")
+
+ await using tmp = await tmpdir({
+ init: async (dir) => {
+ await Bun.write(
+ path.join(dir, "opencode.json"),
+ JSON.stringify({
+ $schema: "https://opencode.ai/config.json",
+ enabled_providers: [providerID],
+ provider: {
+ [providerID]: {
+ options: {
+ apiKey: "test-key",
+ baseURL: `${server.url.origin}/v1`,
+ },
+ },
+ },
+ }),
+ )
+ },
+ })
+
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const resolved = await Provider.getModel(ProviderID.make(providerID), ModelID.make(model.id))
+ const sessionID = SessionID.make("session-test-service-abort")
+ const agent = {
+ name: "test",
+ mode: "primary",
+ options: {},
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ } satisfies Agent.Info
+ const user = {
+ id: MessageID.make("user-service-abort"),
+ sessionID,
+ role: "user",
+ time: { created: Date.now() },
+ agent: agent.name,
+ model: { providerID: ProviderID.make(providerID), modelID: resolved.id },
+ } satisfies MessageV2.User
+
+ const ctrl = new AbortController()
+ const { runPromiseExit } = makeRuntime(LLM.Service, LLM.defaultLayer)
+ const run = runPromiseExit(
+ (svc) =>
+ svc
+ .stream({
+ user,
+ sessionID,
+ model: resolved,
+ agent,
+ system: ["You are a helpful assistant."],
+ messages: [{ role: "user", content: "Hello" }],
+ tools: {},
+ })
+ .pipe(Stream.runDrain),
+ { signal: ctrl.signal },
+ )
+
+ await pending.request
+ ctrl.abort()
+
+ await Promise.race([pending.responseCanceled, timeout(500)])
+ const exit = await run
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.hasInterrupts(exit.cause)).toBe(true)
+ }
+ await Promise.race([pending.requestAborted, timeout(500)]).catch(() => undefined)
+ },
+ })
+ })
+
test("keeps tools enabled by prompt permissions", async () => {
const server = state.server
if (!server) {
diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts
index cd9d97e15fdd..0dfdef26f61e 100644
--- a/packages/opencode/test/session/processor-effect.test.ts
+++ b/packages/opencode/test/session/processor-effect.test.ts
@@ -1,7 +1,7 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test"
import { APICallError } from "ai"
-import { Effect, Layer, ServiceMap } from "effect"
+import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import path from "path"
import type { Agent } from "../../src/agent/agent"
@@ -10,7 +10,6 @@ import { Bus } from "../../src/bus"
import { Config } from "../../src/config/config"
import { Permission } from "../../src/permission"
import { Plugin } from "../../src/plugin"
-import { Instance } from "../../src/project/instance"
import type { Provider } from "../../src/provider/provider"
import { ModelID, ProviderID } from "../../src/provider/schema"
import { Session } from "../../src/session"
@@ -120,21 +119,8 @@ function fail(err: E, ...items: LLM.Event[]) {
return stream(...items).pipe(Stream.concat(Stream.fail(err)))
}
-function wait(abort: AbortSignal) {
- return Effect.promise(
- () =>
- new Promise((done) => {
- abort.addEventListener("abort", () => done(), { once: true })
- }),
- )
-}
-
-function hang(input: LLM.StreamInput, ...items: LLM.Event[]) {
- return stream(...items).pipe(
- Stream.concat(
- Stream.unwrap(wait(input.abort).pipe(Effect.as(Stream.fail(new DOMException("Aborted", "AbortError"))))),
- ),
- )
+function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) {
+ return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never)))
}
function model(context: number): Provider.Model {
@@ -291,13 +277,11 @@ it.effect("session.processor effect tests capture llm input cleanly", () => {
const chat = yield* session.create({})
const parent = yield* user(chat.id, "hi")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const input = {
@@ -313,7 +297,6 @@ it.effect("session.processor effect tests capture llm input cleanly", () => {
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "hi" }],
tools: {},
} satisfies LLM.StreamInput
@@ -359,13 +342,11 @@ it.effect("session.processor effect tests stop after token overflow requests com
const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(20)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -381,7 +362,6 @@ it.effect("session.processor effect tests stop after token overflow requests com
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "compact" }],
tools: {},
})
@@ -433,13 +413,11 @@ it.effect("session.processor effect tests reset reasoning state across retries",
const chat = yield* session.create({})
const parent = yield* user(chat.id, "reason")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -455,7 +433,6 @@ it.effect("session.processor effect tests reset reasoning state across retries",
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "reason" }],
tools: {},
})
@@ -485,13 +462,11 @@ it.effect("session.processor effect tests do not retry unknown json errors", ()
const chat = yield* session.create({})
const parent = yield* user(chat.id, "json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -507,7 +482,6 @@ it.effect("session.processor effect tests do not retry unknown json errors", ()
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "json" }],
tools: {},
})
@@ -535,13 +509,11 @@ it.effect("session.processor effect tests retry recognized structured json error
const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -557,7 +529,6 @@ it.effect("session.processor effect tests retry recognized structured json error
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "retry json" }],
tools: {},
})
@@ -601,7 +572,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const states: number[] = []
const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
@@ -612,7 +582,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -628,7 +597,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "retry" }],
tools: {},
})
@@ -656,13 +624,11 @@ it.effect("session.processor effect tests compact on structured context overflow
const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -678,7 +644,6 @@ it.effect("session.processor effect tests compact on structured context overflow
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "compact json" }],
tools: {},
})
@@ -696,7 +661,6 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
(dir) =>
Effect.gen(function* () {
const ready = defer()
- const seen = defer()
const test = yield* TestLLM
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
@@ -710,17 +674,15 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
const chat = yield* session.create({})
const parent = yield* user(chat.id, "tool abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
- const run = Effect.runPromise(
- handle.process({
+ const run = yield* handle
+ .process({
user: {
id: parent.id,
sessionID: chat.id,
@@ -733,20 +695,25 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "tool abort" }],
tools: {},
- }),
- )
+ })
+ .pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
- abort.abort()
+ yield* Fiber.interrupt(run)
- const value = yield* Effect.promise(() => run)
+ const exit = yield* Fiber.await(run)
+ if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
+ yield* handle.abort()
+ }
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
- expect(value).toBe("stop")
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
+ }
expect(yield* test.calls).toBe(1)
expect(tool?.state.status).toBe("error")
if (tool?.state.status === "error") {
@@ -779,7 +746,6 @@ it.effect("session.processor effect tests record aborted errors and idle state",
const chat = yield* session.create({})
const parent = yield* user(chat.id, "abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const errs: string[] = []
const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => {
@@ -792,11 +758,10 @@ it.effect("session.processor effect tests record aborted errors and idle state",
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
- const run = Effect.runPromise(
- handle.process({
+ const run = yield* handle
+ .process({
user: {
id: parent.id,
sessionID: chat.id,
@@ -809,22 +774,27 @@ it.effect("session.processor effect tests record aborted errors and idle state",
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "abort" }],
tools: {},
- }),
- )
+ })
+ .pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
- abort.abort()
+ yield* Fiber.interrupt(run)
- const value = yield* Effect.promise(() => run)
+ const exit = yield* Fiber.await(run)
+ if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
+ yield* handle.abort()
+ }
yield* Effect.promise(() => seen.promise)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const state = yield* status.get(chat.id)
off()
- expect(value).toBe("stop")
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
+ }
expect(handle.message.error?.name).toBe("MessageAbortedError")
expect(stored.info.role).toBe("assistant")
if (stored.info.role === "assistant") {
@@ -836,3 +806,67 @@ it.effect("session.processor effect tests record aborted errors and idle state",
{ git: true },
)
})
+
+it.effect("session.processor effect tests mark interruptions aborted without manual abort", () => {
+ return provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer()
+ const processors = yield* SessionProcessor.Service
+ const session = yield* Session.Service
+ const status = yield* SessionStatus.Service
+ const test = yield* TestLLM
+
+ yield* test.push((input) =>
+ hang(input, start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+
+ const chat = yield* session.create({})
+ const parent = yield* user(chat.id, "interrupt")
+ const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
+ const mdl = model(100)
+ const handle = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: chat.id,
+ model: mdl,
+ })
+
+ const run = yield* handle
+ .process({
+ user: {
+ id: parent.id,
+ sessionID: chat.id,
+ role: "user",
+ time: parent.time,
+ agent: parent.agent,
+ model: { providerID: ref.providerID, modelID: ref.modelID },
+ } satisfies MessageV2.User,
+ sessionID: chat.id,
+ model: mdl,
+ agent: agent(),
+ system: [],
+ messages: [{ role: "user", content: "interrupt" }],
+ tools: {},
+ })
+ .pipe(Effect.forkChild)
+
+ yield* Effect.promise(() => ready.promise)
+ yield* Fiber.interrupt(run)
+
+ const exit = yield* Fiber.await(run)
+ const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
+ const state = yield* status.get(chat.id)
+
+ expect(Exit.isFailure(exit)).toBe(true)
+ expect(handle.message.error?.name).toBe("MessageAbortedError")
+ expect(stored.info.role).toBe("assistant")
+ if (stored.info.role === "assistant") {
+ expect(stored.info.error?.name).toBe("MessageAbortedError")
+ }
+ expect(state).toMatchObject({ type: "idle" })
+ }),
+ { git: true },
+ )
+})
diff --git a/packages/opencode/test/session/prompt-concurrency.test.ts b/packages/opencode/test/session/prompt-concurrency.test.ts
new file mode 100644
index 000000000000..19e1c4bf59cf
--- /dev/null
+++ b/packages/opencode/test/session/prompt-concurrency.test.ts
@@ -0,0 +1,247 @@
+import { describe, expect, spyOn, test } from "bun:test"
+import { Instance } from "../../src/project/instance"
+import { Provider } from "../../src/provider/provider"
+import { Session } from "../../src/session"
+import { MessageV2 } from "../../src/session/message-v2"
+import { SessionPrompt } from "../../src/session/prompt"
+import { SessionStatus } from "../../src/session/status"
+import { MessageID, PartID, SessionID } from "../../src/session/schema"
+import { Log } from "../../src/util/log"
+import { tmpdir } from "../fixture/fixture"
+
+Log.init({ print: false })
+
+function deferred() {
+ let resolve!: () => void
+ const promise = new Promise((done) => {
+ resolve = done
+ })
+ return { promise, resolve }
+}
+
+// Helper: seed a session with a user message + finished assistant message
+// so loop() exits immediately without calling any LLM
+async function seed(sessionID: SessionID) {
+ const userMsg: MessageV2.Info = {
+ id: MessageID.ascending(),
+ role: "user",
+ sessionID,
+ time: { created: Date.now() },
+ agent: "build",
+ model: { providerID: "openai" as any, modelID: "gpt-5.2" as any },
+ }
+ await Session.updateMessage(userMsg)
+ await Session.updatePart({
+ id: PartID.ascending(),
+ messageID: userMsg.id,
+ sessionID,
+ type: "text",
+ text: "hello",
+ })
+
+ const assistantMsg: MessageV2.Info = {
+ id: MessageID.ascending(),
+ role: "assistant",
+ parentID: userMsg.id,
+ sessionID,
+ mode: "build",
+ agent: "build",
+ cost: 0,
+ path: { cwd: "/tmp", root: "/tmp" },
+ tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
+ modelID: "gpt-5.2" as any,
+ providerID: "openai" as any,
+ time: { created: Date.now(), completed: Date.now() },
+ finish: "stop",
+ }
+ await Session.updateMessage(assistantMsg)
+ await Session.updatePart({
+ id: PartID.ascending(),
+ messageID: assistantMsg.id,
+ sessionID,
+ type: "text",
+ text: "hi there",
+ })
+
+ return { userMsg, assistantMsg }
+}
+
+describe("session.prompt concurrency", () => {
+ test("loop returns assistant message and sets status to idle", async () => {
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ await seed(session.id)
+
+ const result = await SessionPrompt.loop({ sessionID: session.id })
+ expect(result.info.role).toBe("assistant")
+ if (result.info.role === "assistant") expect(result.info.finish).toBe("stop")
+
+ const status = await SessionStatus.get(session.id)
+ expect(status.type).toBe("idle")
+ },
+ })
+ })
+
+ test("concurrent loop callers get the same result", async () => {
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ await seed(session.id)
+
+ const [a, b] = await Promise.all([
+ SessionPrompt.loop({ sessionID: session.id }),
+ SessionPrompt.loop({ sessionID: session.id }),
+ ])
+
+ expect(a.info.id).toBe(b.info.id)
+ expect(a.info.role).toBe("assistant")
+ },
+ })
+ })
+
+ test("assertNotBusy throws when loop is running", async () => {
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ const userMsg: MessageV2.Info = {
+ id: MessageID.ascending(),
+ role: "user",
+ sessionID: session.id,
+ time: { created: Date.now() },
+ agent: "build",
+ model: { providerID: "openai" as any, modelID: "gpt-5.2" as any },
+ }
+ await Session.updateMessage(userMsg)
+ await Session.updatePart({
+ id: PartID.ascending(),
+ messageID: userMsg.id,
+ sessionID: session.id,
+ type: "text",
+ text: "hello",
+ })
+
+ const ready = deferred()
+ const gate = deferred()
+ const getModel = spyOn(Provider, "getModel").mockImplementation(async () => {
+ ready.resolve()
+ await gate.promise
+ throw new Error("test stop")
+ })
+
+ try {
+ const loopPromise = SessionPrompt.loop({ sessionID: session.id }).catch(() => undefined)
+ await ready.promise
+
+ await expect(SessionPrompt.assertNotBusy(session.id)).rejects.toBeInstanceOf(Session.BusyError)
+
+ gate.resolve()
+ await loopPromise
+ } finally {
+ gate.resolve()
+ getModel.mockRestore()
+ }
+
+ // After loop completes, assertNotBusy should succeed
+ await SessionPrompt.assertNotBusy(session.id)
+ },
+ })
+ })
+
+ test("cancel sets status to idle", async () => {
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ // Seed only a user message — loop must call getModel to proceed
+ const userMsg: MessageV2.Info = {
+ id: MessageID.ascending(),
+ role: "user",
+ sessionID: session.id,
+ time: { created: Date.now() },
+ agent: "build",
+ model: { providerID: "openai" as any, modelID: "gpt-5.2" as any },
+ }
+ await Session.updateMessage(userMsg)
+ await Session.updatePart({
+ id: PartID.ascending(),
+ messageID: userMsg.id,
+ sessionID: session.id,
+ type: "text",
+ text: "hello",
+ })
+ // Also seed an assistant message so lastAssistant() fallback can find it
+ const assistantMsg: MessageV2.Info = {
+ id: MessageID.ascending(),
+ role: "assistant",
+ parentID: userMsg.id,
+ sessionID: session.id,
+ mode: "build",
+ agent: "build",
+ cost: 0,
+ path: { cwd: "/tmp", root: "/tmp" },
+ tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
+ modelID: "gpt-5.2" as any,
+ providerID: "openai" as any,
+ time: { created: Date.now() },
+ }
+ await Session.updateMessage(assistantMsg)
+ await Session.updatePart({
+ id: PartID.ascending(),
+ messageID: assistantMsg.id,
+ sessionID: session.id,
+ type: "text",
+ text: "hi there",
+ })
+
+ const ready = deferred()
+ const gate = deferred()
+ const getModel = spyOn(Provider, "getModel").mockImplementation(async () => {
+ ready.resolve()
+ await gate.promise
+ throw new Error("test stop")
+ })
+
+ try {
+ // Start loop — it will block in getModel (assistant has no finish, so loop continues)
+ const loopPromise = SessionPrompt.loop({ sessionID: session.id })
+
+ await ready.promise
+
+ await SessionPrompt.cancel(session.id)
+
+ const status = await SessionStatus.get(session.id)
+ expect(status.type).toBe("idle")
+
+ // loop should resolve cleanly, not throw "All fibers interrupted"
+ const result = await loopPromise
+ expect(result.info.role).toBe("assistant")
+ expect(result.info.id).toBe(assistantMsg.id)
+ } finally {
+ gate.resolve()
+ getModel.mockRestore()
+ }
+ },
+ })
+ }, 10000)
+
+ test("cancel on idle session just sets idle", async () => {
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ await SessionPrompt.cancel(session.id)
+ const status = await SessionStatus.get(session.id)
+ expect(status.type).toBe("idle")
+ },
+ })
+ })
+})
diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts
new file mode 100644
index 000000000000..9f35a21f4a4e
--- /dev/null
+++ b/packages/opencode/test/session/prompt-effect.test.ts
@@ -0,0 +1,1140 @@
+import { NodeFileSystem } from "@effect/platform-node"
+import { expect } from "bun:test"
+import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect"
+import * as Stream from "effect/Stream"
+import type { Agent } from "../../src/agent/agent"
+import { Agent as AgentSvc } from "../../src/agent/agent"
+import { Bus } from "../../src/bus"
+import { Command } from "../../src/command"
+import { Config } from "../../src/config/config"
+import { FileTime } from "../../src/file/time"
+import { LSP } from "../../src/lsp"
+import { MCP } from "../../src/mcp"
+import { Permission } from "../../src/permission"
+import { Plugin } from "../../src/plugin"
+import type { Provider } from "../../src/provider/provider"
+import { ModelID, ProviderID } from "../../src/provider/schema"
+import { Session } from "../../src/session"
+import { LLM } from "../../src/session/llm"
+import { MessageV2 } from "../../src/session/message-v2"
+import { AppFileSystem } from "../../src/filesystem"
+import { SessionCompaction } from "../../src/session/compaction"
+import { SessionProcessor } from "../../src/session/processor"
+import { SessionPrompt } from "../../src/session/prompt"
+import { MessageID, PartID, SessionID } from "../../src/session/schema"
+import { SessionStatus } from "../../src/session/status"
+import { Shell } from "../../src/shell/shell"
+import { Snapshot } from "../../src/snapshot"
+import { ToolRegistry } from "../../src/tool/registry"
+import { Truncate } from "../../src/tool/truncate"
+import { Log } from "../../src/util/log"
+import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
+import { provideTmpdirInstance } from "../fixture/fixture"
+import { testEffect } from "../lib/effect"
+
+Log.init({ print: false })
+
+const ref = {
+ providerID: ProviderID.make("test"),
+ modelID: ModelID.make("test-model"),
+}
+
+type Script = Stream.Stream | ((input: LLM.StreamInput) => Stream.Stream)
+
+class TestLLM extends ServiceMap.Service<
+ TestLLM,
+ {
+ readonly push: (stream: Script) => Effect.Effect
+ readonly reply: (...items: LLM.Event[]) => Effect.Effect
+ readonly calls: Effect.Effect
+ readonly inputs: Effect.Effect
+ }
+>()("@test/PromptLLM") {}
+
+function stream(...items: LLM.Event[]) {
+ return Stream.make(...items)
+}
+
+function usage(input = 1, output = 1, total = input + output) {
+ return {
+ inputTokens: input,
+ outputTokens: output,
+ totalTokens: total,
+ inputTokenDetails: {
+ noCacheTokens: undefined,
+ cacheReadTokens: undefined,
+ cacheWriteTokens: undefined,
+ },
+ outputTokenDetails: {
+ textTokens: undefined,
+ reasoningTokens: undefined,
+ },
+ }
+}
+
+function start(): LLM.Event {
+ return { type: "start" }
+}
+
+function textStart(id = "t"): LLM.Event {
+ return { type: "text-start", id }
+}
+
+function textDelta(id: string, text: string): LLM.Event {
+ return { type: "text-delta", id, text }
+}
+
+function textEnd(id = "t"): LLM.Event {
+ return { type: "text-end", id }
+}
+
+function finishStep(): LLM.Event {
+ return {
+ type: "finish-step",
+ finishReason: "stop",
+ rawFinishReason: "stop",
+ response: { id: "res", modelId: "test-model", timestamp: new Date() },
+ providerMetadata: undefined,
+ usage: usage(),
+ }
+}
+
+function finish(): LLM.Event {
+ return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() }
+}
+
+function finishToolCallsStep(): LLM.Event {
+ return {
+ type: "finish-step",
+ finishReason: "tool-calls",
+ rawFinishReason: "tool_calls",
+ response: { id: "res", modelId: "test-model", timestamp: new Date() },
+ providerMetadata: undefined,
+ usage: usage(),
+ }
+}
+
+function finishToolCalls(): LLM.Event {
+ return { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() }
+}
+
+function replyStop(text: string, id = "t") {
+ return [start(), textStart(id), textDelta(id, text), textEnd(id), finishStep(), finish()] as const
+}
+
+function replyToolCalls(text: string, id = "t") {
+ return [start(), textStart(id), textDelta(id, text), textEnd(id), finishToolCallsStep(), finishToolCalls()] as const
+}
+
+function toolInputStart(id: string, toolName: string): LLM.Event {
+ return { type: "tool-input-start", id, toolName }
+}
+
+function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event {
+ return { type: "tool-call", toolCallId, toolName, input }
+}
+
+function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) {
+ return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never)))
+}
+
+function defer() {
+ let resolve!: (value: T | PromiseLike) => void
+ const promise = new Promise((done) => {
+ resolve = done
+ })
+ return { promise, resolve }
+}
+
+function waitMs(ms: number) {
+ return Effect.promise(() => new Promise((done) => setTimeout(done, ms)))
+}
+
+function withSh(fx: () => Effect.Effect) {
+ return Effect.acquireUseRelease(
+ Effect.sync(() => {
+ const prev = process.env.SHELL
+ process.env.SHELL = "/bin/sh"
+ Shell.preferred.reset()
+ return prev
+ }),
+ () => fx(),
+ (prev) =>
+ Effect.sync(() => {
+ if (prev === undefined) delete process.env.SHELL
+ else process.env.SHELL = prev
+ Shell.preferred.reset()
+ }),
+ )
+}
+
+function toolPart(parts: MessageV2.Part[]) {
+ return parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
+}
+
+type CompletedToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateCompleted }
+type ErrorToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateError }
+
+function completedTool(parts: MessageV2.Part[]) {
+ const part = toolPart(parts)
+ expect(part?.state.status).toBe("completed")
+ return part?.state.status === "completed" ? (part as CompletedToolPart) : undefined
+}
+
+function errorTool(parts: MessageV2.Part[]) {
+ const part = toolPart(parts)
+ expect(part?.state.status).toBe("error")
+ return part?.state.status === "error" ? (part as ErrorToolPart) : undefined
+}
+
+const llm = Layer.unwrap(
+ Effect.gen(function* () {
+ const queue: Script[] = []
+ const inputs: LLM.StreamInput[] = []
+ let calls = 0
+
+ const push = Effect.fn("TestLLM.push")((item: Script) => {
+ queue.push(item)
+ return Effect.void
+ })
+
+ const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items)))
+ return Layer.mergeAll(
+ Layer.succeed(
+ LLM.Service,
+ LLM.Service.of({
+ stream: (input) => {
+ calls += 1
+ inputs.push(input)
+ const item = queue.shift() ?? Stream.empty
+ return typeof item === "function" ? item(input) : item
+ },
+ }),
+ ),
+ Layer.succeed(
+ TestLLM,
+ TestLLM.of({
+ push,
+ reply,
+ calls: Effect.sync(() => calls),
+ inputs: Effect.sync(() => [...inputs]),
+ }),
+ ),
+ )
+ }),
+)
+
+const mcp = Layer.succeed(
+ MCP.Service,
+ MCP.Service.of({
+ status: () => Effect.succeed({}),
+ clients: () => Effect.succeed({}),
+ tools: () => Effect.succeed({}),
+ prompts: () => Effect.succeed({}),
+ resources: () => Effect.succeed({}),
+ add: () => Effect.succeed({ status: { status: "disabled" as const } }),
+ connect: () => Effect.void,
+ disconnect: () => Effect.void,
+ getPrompt: () => Effect.succeed(undefined),
+ readResource: () => Effect.succeed(undefined),
+ startAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
+ authenticate: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
+ finishAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
+ removeAuth: () => Effect.void,
+ supportsOAuth: () => Effect.succeed(false),
+ hasStoredTokens: () => Effect.succeed(false),
+ getAuthStatus: () => Effect.succeed("not_authenticated" as const),
+ }),
+)
+
+const lsp = Layer.succeed(
+ LSP.Service,
+ LSP.Service.of({
+ init: () => Effect.void,
+ status: () => Effect.succeed([]),
+ hasClients: () => Effect.succeed(false),
+ touchFile: () => Effect.void,
+ diagnostics: () => Effect.succeed({}),
+ hover: () => Effect.succeed(undefined),
+ definition: () => Effect.succeed([]),
+ references: () => Effect.succeed([]),
+ implementation: () => Effect.succeed([]),
+ documentSymbol: () => Effect.succeed([]),
+ workspaceSymbol: () => Effect.succeed([]),
+ prepareCallHierarchy: () => Effect.succeed([]),
+ incomingCalls: () => Effect.succeed([]),
+ outgoingCalls: () => Effect.succeed([]),
+ }),
+)
+
+const filetime = Layer.succeed(
+ FileTime.Service,
+ FileTime.Service.of({
+ read: () => Effect.void,
+ get: () => Effect.succeed(undefined),
+ assert: () => Effect.void,
+ withLock: (_filepath, fn) => Effect.promise(fn),
+ }),
+)
+
+const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
+const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
+const deps = Layer.mergeAll(
+ Session.defaultLayer,
+ Snapshot.defaultLayer,
+ AgentSvc.defaultLayer,
+ Command.defaultLayer,
+ Permission.layer,
+ Plugin.defaultLayer,
+ Config.defaultLayer,
+ filetime,
+ lsp,
+ mcp,
+ AppFileSystem.defaultLayer,
+ status,
+ llm,
+).pipe(Layer.provideMerge(infra))
+const registry = ToolRegistry.layer.pipe(Layer.provideMerge(deps))
+const trunc = Truncate.layer.pipe(Layer.provideMerge(deps))
+const proc = SessionProcessor.layer.pipe(Layer.provideMerge(deps))
+const compact = SessionCompaction.layer.pipe(Layer.provideMerge(proc), Layer.provideMerge(deps))
+const env = SessionPrompt.layer.pipe(
+ Layer.provideMerge(compact),
+ Layer.provideMerge(proc),
+ Layer.provideMerge(registry),
+ Layer.provideMerge(trunc),
+ Layer.provideMerge(deps),
+)
+
+const it = testEffect(env)
+const unix = process.platform !== "win32" ? it.effect : it.effect.skip
+
+// Config that registers a custom "test" provider with a "test-model" model
+// so Provider.getModel("test", "test-model") succeeds inside the loop.
+const cfg = {
+ provider: {
+ test: {
+ name: "Test",
+ id: "test",
+ env: [],
+ npm: "@ai-sdk/openai-compatible",
+ models: {
+ "test-model": {
+ id: "test-model",
+ name: "Test Model",
+ attachment: false,
+ reasoning: false,
+ temperature: false,
+ tool_call: true,
+ release_date: "2025-01-01",
+ limit: { context: 100000, output: 10000 },
+ cost: { input: 0, output: 0 },
+ options: {},
+ },
+ },
+ options: {
+ apiKey: "test-key",
+ baseURL: "http://localhost:1/v1",
+ },
+ },
+ },
+}
+
+const user = Effect.fn("test.user")(function* (sessionID: SessionID, text: string) {
+ const session = yield* Session.Service
+ const msg = yield* session.updateMessage({
+ id: MessageID.ascending(),
+ role: "user",
+ sessionID,
+ agent: "build",
+ model: ref,
+ time: { created: Date.now() },
+ })
+ yield* session.updatePart({
+ id: PartID.ascending(),
+ messageID: msg.id,
+ sessionID,
+ type: "text",
+ text,
+ })
+ return msg
+})
+
+const seed = Effect.fn("test.seed")(function* (sessionID: SessionID, opts?: { finish?: string }) {
+ const session = yield* Session.Service
+ const msg = yield* user(sessionID, "hello")
+ const assistant: MessageV2.Assistant = {
+ id: MessageID.ascending(),
+ role: "assistant",
+ parentID: msg.id,
+ sessionID,
+ mode: "build",
+ agent: "build",
+ cost: 0,
+ path: { cwd: "/tmp", root: "/tmp" },
+ tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
+ modelID: ref.modelID,
+ providerID: ref.providerID,
+ time: { created: Date.now() },
+ ...(opts?.finish ? { finish: opts.finish } : {}),
+ }
+ yield* session.updateMessage(assistant)
+ yield* session.updatePart({
+ id: PartID.ascending(),
+ messageID: assistant.id,
+ sessionID,
+ type: "text",
+ text: "hi there",
+ })
+ return { user: msg, assistant }
+})
+
+const addSubtask = (sessionID: SessionID, messageID: MessageID, model = ref) =>
+ Effect.gen(function* () {
+ const session = yield* Session.Service
+ yield* session.updatePart({
+ id: PartID.ascending(),
+ messageID,
+ sessionID,
+ type: "subtask",
+ prompt: "look into the cache key path",
+ description: "inspect bug",
+ agent: "general",
+ model,
+ })
+ })
+
+const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) {
+ const test = yield* TestLLM
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create(input ?? { title: "Pinned" })
+ return { test, prompt, sessions, chat }
+})
+
+// Loop semantics
+
+it.effect("loop exits immediately when last assistant has stop finish", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* seed(chat.id, { finish: "stop" })
+
+ const result = yield* prompt.loop({ sessionID: chat.id })
+ expect(result.info.role).toBe("assistant")
+ if (result.info.role === "assistant") expect(result.info.finish).toBe("stop")
+ expect(yield* test.calls).toBe(0)
+ }),
+ { git: true },
+ ),
+)
+
+it.effect("loop calls LLM and returns assistant message", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* test.reply(...replyStop("world"))
+ yield* user(chat.id, "hello")
+
+ const result = yield* prompt.loop({ sessionID: chat.id })
+ expect(result.info.role).toBe("assistant")
+ const parts = result.parts.filter((p) => p.type === "text")
+ expect(parts.some((p) => p.type === "text" && p.text === "world")).toBe(true)
+ expect(yield* test.calls).toBe(1)
+ }),
+ { git: true, config: cfg },
+ ),
+)
+
+it.effect("loop continues when finish is tool-calls", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* test.reply(...replyToolCalls("first"))
+ yield* test.reply(...replyStop("second"))
+ yield* user(chat.id, "hello")
+
+ const result = yield* prompt.loop({ sessionID: chat.id })
+ expect(yield* test.calls).toBe(2)
+ expect(result.info.role).toBe("assistant")
+ if (result.info.role === "assistant") {
+ expect(result.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
+ expect(result.info.finish).toBe("stop")
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+)
+
+it.effect("failed subtask preserves metadata on error tool state", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot({ title: "Pinned" })
+ yield* test.reply(
+ start(),
+ toolInputStart("task-1", "task"),
+ toolCall("task-1", "task", {
+ description: "inspect bug",
+ prompt: "look into the cache key path",
+ subagent_type: "general",
+ }),
+ {
+ type: "finish-step",
+ finishReason: "tool-calls",
+ rawFinishReason: "tool_calls",
+ response: { id: "res", modelId: "test-model", timestamp: new Date() },
+ providerMetadata: undefined,
+ usage: usage(),
+ },
+ { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() },
+ )
+ yield* test.reply(...replyStop("done"))
+ const msg = yield* user(chat.id, "hello")
+ yield* addSubtask(chat.id, msg.id)
+
+ const result = yield* prompt.loop({ sessionID: chat.id })
+ expect(result.info.role).toBe("assistant")
+ expect(yield* test.calls).toBe(2)
+
+ const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
+ const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
+ expect(taskMsg?.info.role).toBe("assistant")
+ if (!taskMsg || taskMsg.info.role !== "assistant") return
+
+ const tool = errorTool(taskMsg.parts)
+ if (!tool) return
+
+ expect(tool.state.error).toContain("Tool execution failed")
+ expect(tool.state.metadata).toBeDefined()
+ expect(tool.state.metadata?.sessionId).toBeDefined()
+ expect(tool.state.metadata?.model).toEqual({
+ providerID: ProviderID.make("test"),
+ modelID: ModelID.make("missing-model"),
+ })
+ }),
+ {
+ git: true,
+ config: {
+ ...cfg,
+ agent: {
+ general: {
+ model: "test/missing-model",
+ },
+ },
+ },
+ },
+ ),
+)
+
+it.effect("loop sets status to busy then idle", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const test = yield* TestLLM
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const bus = yield* Bus.Service
+
+ yield* test.reply(start(), textStart(), textDelta("t", "ok"), textEnd(), finishStep(), finish())
+
+ const chat = yield* sessions.create({})
+ yield* user(chat.id, "hi")
+
+ const types: string[] = []
+ const idle = defer()
+ const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
+ if (evt.properties.sessionID !== chat.id) return
+ types.push(evt.properties.status.type)
+ if (evt.properties.status.type === "idle") idle.resolve()
+ })
+
+ yield* prompt.loop({ sessionID: chat.id })
+ yield* Effect.promise(() => idle.promise)
+ off()
+
+ expect(types).toContain("busy")
+ expect(types[types.length - 1]).toBe("idle")
+ }),
+ { git: true, config: cfg },
+ ),
+)
+
+// Cancel semantics
+
+it.effect(
+ "cancel interrupts loop and resolves with an assistant message",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* seed(chat.id)
+
+ // Make LLM hang so the loop blocks
+ yield* test.push((input) => hang(input, start()))
+
+ // Seed a new user message so the loop enters the LLM path
+ yield* user(chat.id, "more")
+
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ // Give the loop time to start
+ yield* waitMs(200)
+ yield* prompt.cancel(chat.id)
+
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value.info.role).toBe("assistant")
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+it.effect(
+ "cancel records MessageAbortedError on interrupted process",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer()
+ const { test, prompt, chat } = yield* boot()
+
+ yield* test.push((input) =>
+ hang(input, start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+ yield* user(chat.id, "hello")
+
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.promise(() => ready.promise)
+ yield* prompt.cancel(chat.id)
+
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ const info = exit.value.info
+ if (info.role === "assistant") {
+ expect(info.error?.name).toBe("MessageAbortedError")
+ }
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+it.effect(
+ "cancel with queued callers resolves all cleanly",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer()
+ const { test, prompt, chat } = yield* boot()
+
+ yield* test.push((input) =>
+ hang(input, start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+ yield* user(chat.id, "hello")
+
+ const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.promise(() => ready.promise)
+ // Queue a second caller
+ const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ yield* prompt.cancel(chat.id)
+
+ const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+ expect(Exit.isSuccess(exitA)).toBe(true)
+ expect(Exit.isSuccess(exitB)).toBe(true)
+ if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) {
+ expect(exitA.value.info.id).toBe(exitB.value.info.id)
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+// Queue semantics
+
+it.effect("concurrent loop callers get same result", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+ yield* seed(chat.id, { finish: "stop" })
+
+ const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
+ concurrency: "unbounded",
+ })
+
+ expect(a.info.id).toBe(b.info.id)
+ expect(a.info.role).toBe("assistant")
+ yield* prompt.assertNotBusy(chat.id)
+ }),
+ { git: true },
+ ),
+)
+
+it.effect("concurrent loop callers all receive same error result", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+
+ // Push a stream that fails — the loop records the error on the assistant message
+ yield* test.push(Stream.fail(new Error("boom")))
+ yield* user(chat.id, "hello")
+
+ const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
+ concurrency: "unbounded",
+ })
+
+ // Both callers get the same assistant with an error recorded
+ expect(a.info.id).toBe(b.info.id)
+ expect(a.info.role).toBe("assistant")
+ if (a.info.role === "assistant") {
+ expect(a.info.error).toBeDefined()
+ }
+ if (b.info.role === "assistant") {
+ expect(b.info.error).toBeDefined()
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+)
+
+it.effect(
+ "prompt submitted during an active run is included in the next LLM input",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer()
+ const gate = defer()
+ const { test, prompt, sessions, chat } = yield* boot()
+
+ yield* test.push((_input) =>
+ stream(start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ Stream.concat(
+ Stream.fromEffect(Effect.promise(() => gate.promise)).pipe(
+ Stream.flatMap(() =>
+ stream(textStart("a"), textDelta("a", "first"), textEnd("a"), finishStep(), finish()),
+ ),
+ ),
+ ),
+ ),
+ )
+
+ const a = yield* prompt
+ .prompt({
+ sessionID: chat.id,
+ agent: "build",
+ model: ref,
+ parts: [{ type: "text", text: "first" }],
+ })
+ .pipe(Effect.forkChild)
+
+ yield* Effect.promise(() => ready.promise)
+
+ const id = MessageID.ascending()
+ const b = yield* prompt
+ .prompt({
+ sessionID: chat.id,
+ messageID: id,
+ agent: "build",
+ model: ref,
+ parts: [{ type: "text", text: "second" }],
+ })
+ .pipe(Effect.forkChild)
+
+ yield* Effect.promise(async () => {
+ const end = Date.now() + 5000
+ while (Date.now() < end) {
+ const msgs = await Effect.runPromise(sessions.messages({ sessionID: chat.id }))
+ if (msgs.some((msg) => msg.info.role === "user" && msg.info.id === id)) return
+ await new Promise((done) => setTimeout(done, 20))
+ }
+ throw new Error("timed out waiting for second prompt to save")
+ })
+
+ yield* test.reply(...replyStop("second"))
+ gate.resolve()
+
+ const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+ expect(Exit.isSuccess(ea)).toBe(true)
+ expect(Exit.isSuccess(eb)).toBe(true)
+ expect(yield* test.calls).toBe(2)
+
+ const msgs = yield* sessions.messages({ sessionID: chat.id })
+ const assistants = msgs.filter((msg) => msg.info.role === "assistant")
+ expect(assistants).toHaveLength(2)
+ const last = assistants.at(-1)
+ if (!last || last.info.role !== "assistant") throw new Error("expected second assistant")
+ expect(last.info.parentID).toBe(id)
+ expect(last.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
+
+ const inputs = yield* test.inputs
+ expect(inputs).toHaveLength(2)
+ expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("second")
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+it.effect(
+ "assertNotBusy throws BusyError when loop running",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer()
+ const test = yield* TestLLM
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+
+ yield* test.push((input) =>
+ hang(input, start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+
+ const chat = yield* sessions.create({})
+ yield* user(chat.id, "hi")
+
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.promise(() => ready.promise)
+
+ const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
+ }
+
+ yield* prompt.cancel(chat.id)
+ yield* Fiber.await(fiber)
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+it.effect("assertNotBusy succeeds when idle", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+
+ const chat = yield* sessions.create({})
+ const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ }),
+ { git: true },
+ ),
+)
+
+// Shell semantics
+
+it.effect(
+ "shell rejects with BusyError when loop running",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer()
+ const { test, prompt, chat } = yield* boot()
+
+ yield* test.push((input) =>
+ hang(input, start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+ yield* user(chat.id, "hi")
+
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.promise(() => ready.promise)
+
+ const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
+ }
+
+ yield* prompt.cancel(chat.id)
+ yield* Fiber.await(fiber)
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+unix("shell captures stdout and stderr in completed tool output", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+ const result = yield* prompt.shell({
+ sessionID: chat.id,
+ agent: "build",
+ command: "printf out && printf err >&2",
+ })
+
+ expect(result.info.role).toBe("assistant")
+ const tool = completedTool(result.parts)
+ if (!tool) return
+
+ expect(tool.state.output).toContain("out")
+ expect(tool.state.output).toContain("err")
+ expect(tool.state.metadata.output).toContain("out")
+ expect(tool.state.metadata.output).toContain("err")
+ yield* prompt.assertNotBusy(chat.id)
+ }),
+ { git: true, config: cfg },
+ ),
+)
+
+unix(
+ "shell updates running metadata before process exit",
+ () =>
+ withSh(() =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+
+ const fiber = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "printf first && sleep 0.2 && printf second" })
+ .pipe(Effect.forkChild)
+
+ yield* Effect.promise(async () => {
+ const start = Date.now()
+ while (Date.now() - start < 5000) {
+ const msgs = await MessageV2.filterCompacted(MessageV2.stream(chat.id))
+ const taskMsg = msgs.find((item) => item.info.role === "assistant")
+ const tool = taskMsg ? toolPart(taskMsg.parts) : undefined
+ if (tool?.state.status === "running" && tool.state.metadata?.output.includes("first")) return
+ await new Promise((done) => setTimeout(done, 20))
+ }
+ throw new Error("timed out waiting for running shell metadata")
+ })
+
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ }),
+ { git: true, config: cfg },
+ ),
+ ),
+ 30_000,
+)
+
+unix(
+ "loop waits while shell runs and starts after shell exits",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* test.reply(...replyStop("after-shell"))
+
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ expect(yield* test.calls).toBe(0)
+
+ yield* Fiber.await(sh)
+ const exit = yield* Fiber.await(run)
+
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value.info.role).toBe("assistant")
+ expect(exit.value.parts.some((part) => part.type === "text" && part.text === "after-shell")).toBe(true)
+ }
+ expect(yield* test.calls).toBe(1)
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+unix(
+ "shell completion resumes queued loop callers",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* test.reply(...replyStop("done"))
+
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ expect(yield* test.calls).toBe(0)
+
+ yield* Fiber.await(sh)
+ const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+
+ expect(Exit.isSuccess(ea)).toBe(true)
+ expect(Exit.isSuccess(eb)).toBe(true)
+ if (Exit.isSuccess(ea) && Exit.isSuccess(eb)) {
+ expect(ea.value.info.id).toBe(eb.value.info.id)
+ expect(ea.value.info.role).toBe("assistant")
+ }
+ expect(yield* test.calls).toBe(1)
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+unix(
+ "cancel interrupts shell and resolves cleanly",
+ () =>
+ withSh(() =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ yield* prompt.cancel(chat.id)
+
+ const status = yield* SessionStatus.Service
+ expect((yield* status.get(chat.id)).type).toBe("idle")
+ const busy = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
+ expect(Exit.isSuccess(busy)).toBe(true)
+
+ const exit = yield* Fiber.await(sh)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value.info.role).toBe("assistant")
+ const tool = completedTool(exit.value.parts)
+ if (tool) {
+ expect(tool.state.output).toContain("User aborted the command")
+ }
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+ ),
+ 30_000,
+)
+
+unix(
+ "cancel persists aborted shell result when shell ignores TERM",
+ () =>
+ withSh(() =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "trap '' TERM; sleep 30" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ yield* prompt.cancel(chat.id)
+
+ const exit = yield* Fiber.await(sh)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value.info.role).toBe("assistant")
+ const tool = completedTool(exit.value.parts)
+ if (tool) {
+ expect(tool.state.output).toContain("User aborted the command")
+ }
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+ ),
+ 30_000,
+)
+
+unix(
+ "cancel interrupts loop queued behind shell",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ yield* prompt.cancel(chat.id)
+
+ const exit = yield* Fiber.await(run)
+ expect(Exit.isSuccess(exit)).toBe(true)
+
+ yield* Fiber.await(sh)
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+unix(
+ "shell rejects when another shell is already running",
+ () =>
+ withSh(() =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+
+ const a = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ const exit = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "echo hi" })
+ .pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
+ }
+
+ yield* prompt.cancel(chat.id)
+ yield* Fiber.await(a)
+ }),
+ { git: true, config: cfg },
+ ),
+ ),
+ 30_000,
+)