diff --git a/packages/opencode/src/permission/index.ts b/packages/opencode/src/permission/index.ts index 321c5c374e39..28bb3db81190 100644 --- a/packages/opencode/src/permission/index.ts +++ b/packages/opencode/src/permission/index.ts @@ -118,6 +118,7 @@ export namespace PermissionNext { readonly ask: (input: z.infer) => Effect.Effect readonly reply: (input: z.infer) => Effect.Effect readonly list: () => Effect.Effect + readonly rejectSession: (sessionID: SessionID) => Effect.Effect } interface PendingEntry { @@ -239,7 +240,21 @@ export namespace PermissionNext { return Array.from(pending.values(), (item) => item.info) }) - return Service.of({ ask, reply, list }) + const rejectSession = Effect.fn("Permission.rejectSession")(function* (sessionID: SessionID) { + for (const [id, entry] of pending.entries()) { + if (entry.info.sessionID !== sessionID) continue + pending.delete(id) + log.info("rejecting for cancelled session", { requestID: id, sessionID }) + Bus.publish(Event.Replied, { + sessionID: entry.info.sessionID, + requestID: entry.info.id, + reply: "reject", + }) + yield* Deferred.fail(entry.deferred, new RejectedError()) + } + }) + + return Service.of({ ask, reply, list, rejectSession }) }), ) @@ -277,6 +292,10 @@ export namespace PermissionNext { return runPromiseInstance(Service.use((svc) => svc.list())) } + export async function rejectSession(sessionID: SessionID) { + return runPromiseInstance(Service.use((svc) => svc.rejectSession(sessionID))) + } + const EDIT_TOOLS = ["edit", "write", "apply_patch", "multiedit"] export function disabled(tools: string[], ruleset: Ruleset): Set { diff --git a/packages/opencode/src/question/index.ts b/packages/opencode/src/question/index.ts index 551c5139999a..3a067853b8c7 100644 --- a/packages/opencode/src/question/index.ts +++ b/packages/opencode/src/question/index.ts @@ -97,6 +97,7 @@ export namespace Question { readonly reply: (input: { requestID: QuestionID; answers: Answer[] }) => Effect.Effect readonly reject: (requestID: QuestionID) => Effect.Effect readonly list: () => Effect.Effect + readonly rejectSession: (sessionID: SessionID) => Effect.Effect } export class Service extends ServiceMap.Service()("@opencode/Question") {} @@ -167,7 +168,20 @@ export namespace Question { return Array.from(pending.values(), (x) => x.info) }) - return Service.of({ ask, reply, reject, list }) + const rejectSession = Effect.fn("Question.rejectSession")(function* (sessionID: SessionID) { + for (const [id, entry] of pending.entries()) { + if (entry.info.sessionID !== sessionID) continue + pending.delete(id) + log.info("rejecting for cancelled session", { requestID: id, sessionID }) + Bus.publish(Event.Rejected, { + sessionID: entry.info.sessionID, + requestID: entry.info.id, + }) + yield* Deferred.fail(entry.deferred, new RejectedError()) + } + }) + + return Service.of({ ask, reply, reject, list, rejectSession }) }), ) @@ -190,4 +204,8 @@ export namespace Question { export async function list(): Promise { return runPromiseInstance(Service.use((svc) => svc.list())) } + + export async function rejectSession(sessionID: SessionID): Promise { + return runPromiseInstance(Service.use((svc) => svc.rejectSession(sessionID))) + } } diff --git a/packages/opencode/src/session/activity.ts b/packages/opencode/src/session/activity.ts new file mode 100644 index 000000000000..2a14330c950b --- /dev/null +++ b/packages/opencode/src/session/activity.ts @@ -0,0 +1,78 @@ +import { Bus } from "@/bus" +import { Instance } from "@/project/instance" +import { MessageV2 } from "./message-v2" +import { Log } from "@/util/log" + +const log = Log.create({ service: "session.activity" }) + +/** + * Tracks per-session last-activity timestamps via Bus events. + * + * Activity signals: + * - message.part.delta (token streaming — highest frequency) + * - message.part.updated (tool state changes, text completions) + * + * The watchdog queries `stale(sessionID, threshold)` to detect sessions + * that have had no activity for longer than the threshold, distinguishing + * "genuinely stuck / idle" from "actively streaming or executing tools". + * + * Root sessions (no parentID) are never subject to idle detection — only + * subagent sessions spawned by the task tool are monitored. + */ +export namespace SessionActivity { + const state = Instance.state(() => { + const data: Record = {} + return data + }) + + /** Update the last-activity timestamp for a session. */ + export function touch(id: string, now = Date.now()) { + state()[id] = now + } + + /** Return the last-activity timestamp, or undefined if never recorded. */ + export function last(id: string): number | undefined { + return state()[id] + } + + /** + * True when the session has had no activity for longer than `threshold` ms. + * Returns false for sessions with no recorded activity (they haven't + * started yet, so they aren't stale). + */ + export function stale(id: string, threshold: number): boolean { + const ts = state()[id] + if (ts === undefined) return false + return Date.now() - ts > threshold + } + + /** Remove tracking for a session (cleanup on cancel / completion). */ + export function remove(id: string) { + delete state()[id] + } + + /** Snapshot of all tracked sessions — for diagnostics / logging. */ + export function list() { + return { ...state() } + } + + /** + * Subscribe to Bus events that indicate session activity. + * Call once during bootstrap (idempotent per Instance lifecycle + * since Instance.state is scoped to the current instance). + */ + export function init() { + log.info("init") + + // Token deltas — fires on every chunk from the LLM stream. + // This is the highest-frequency signal and acts as a natural heartbeat. + Bus.subscribe(MessageV2.Event.PartDelta, (evt) => { + touch(evt.properties.sessionID) + }) + + // Part state changes — tool start/complete/error, text start/end, etc. + Bus.subscribe(MessageV2.Event.PartUpdated, (evt) => { + touch(evt.properties.part.sessionID) + }) + } +} diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 8200dea7564d..17274d1d0604 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -5,6 +5,7 @@ import { Agent } from "@/agent/agent" import { Snapshot } from "@/snapshot" import { SessionSummary } from "./summary" import { Bus } from "@/bus" +import { BusEvent } from "@/bus/bus-event" import { SessionRetry } from "./retry" import { SessionStatus } from "./status" import { Plugin } from "@/plugin" @@ -14,10 +15,14 @@ import { Config } from "@/config/config" import { SessionCompaction } from "./compaction" import { PermissionNext } from "@/permission" import { Question } from "@/question" -import { PartID } from "./schema" -import type { SessionID, MessageID } from "./schema" +import { PartID, SessionID } from "./schema" +import type { MessageID } from "./schema" +import z from "zod" export namespace SessionProcessor { + export const Event = { + CancelRequested: BusEvent.define("session.prompt.cancel", z.object({ sessionID: SessionID.zod })), + } const DOOM_LOOP_THRESHOLD = 3 const log = Log.create({ service: "session.processor" }) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index bac958ec1033..7521ecf7df41 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -2,6 +2,7 @@ import path from "path" import os from "os" import fs from "fs/promises" import z from "zod" +import { SessionActivity } from "./activity" import { Filesystem } from "../util/filesystem" import { SessionID, MessageID, PartID } from "./schema" import { MessageV2 } from "./message-v2" @@ -42,6 +43,7 @@ import { SessionProcessor } from "./processor" import { TaskTool } from "@/tool/task" import { Tool } from "@/tool/tool" import { PermissionNext } from "@/permission" +import { Question } from "@/question" import { SessionStatus } from "./status" import { LLM } from "./llm" import { iife } from "@/util/iife" @@ -76,6 +78,7 @@ export namespace SessionPrompt { resolve(input: MessageV2.WithParts): void reject(reason?: any): void }[] + generation: number } > = {} return data @@ -87,6 +90,26 @@ export namespace SessionPrompt { }, ) + + let gen = 0 + const precancelled = new Map() + + export const SessionCancelledError = NamedError.create( + "SessionCancelledError", + z.object({ sessionID: SessionID.zod }), + ) + + /** @internal Exported for testing */ + export const _state = state + /** @internal Exported for testing */ + export const _precancelled = precancelled + + export function init() { + const log = Log.create({ service: "session.prompt" }) + log.info("init") + + Bus.subscribe(SessionProcessor.Event.CancelRequested, (evt) => cancel(evt.properties.sessionID)) + } export function assertNotBusy(sessionID: SessionID) { const match = state()[sessionID] if (match) throw new Session.BusyError(sessionID) @@ -239,13 +262,20 @@ export namespace SessionPrompt { return parts } - function start(sessionID: SessionID) { + /** @internal Exported for testing */ + export function start(sessionID: SessionID) { const s = state() if (s[sessionID]) return + if (precancelled.has(sessionID)) { + precancelled.delete(sessionID) + return + } const controller = new AbortController() + gen++ s[sessionID] = { abort: controller, callbacks: [], + generation: gen, } return controller.signal } @@ -257,18 +287,25 @@ export namespace SessionPrompt { return s[sessionID].abort.signal } - export function cancel(sessionID: SessionID) { - log.info("cancel", { sessionID }) + export function cancel(sessionID: SessionID, generation?: number) { + log.info("cancel", { sessionID, generation }) const s = state() const match = s[sessionID] - if (!match) { + if (match) { + if (generation !== undefined && generation !== match.generation) return + for (const cb of match.callbacks) cb.reject(new SessionCancelledError({ sessionID })) + match.abort.abort() + SessionActivity.remove(sessionID) + delete s[sessionID] + // Reject any pending permission/question promises so tool calls unblock + PermissionNext.rejectSession(sessionID).catch(() => {}) + Question.rejectSession(sessionID).catch(() => {}) SessionStatus.set(sessionID, { type: "idle" }) return } - match.abort.abort() - delete s[sessionID] + if (generation !== undefined) return + precancelled.set(sessionID, Date.now()) SessionStatus.set(sessionID, { type: "idle" }) - return } export const LoopInput = z.object({ @@ -280,13 +317,15 @@ export namespace SessionPrompt { const abort = resume_existing ? resume(sessionID) : start(sessionID) if (!abort) { + const entry = state()[sessionID] + if (!entry) throw new SessionCancelledError({ sessionID }) return new Promise((resolve, reject) => { - const callbacks = state()[sessionID].callbacks - callbacks.push({ resolve, reject }) + entry.callbacks.push({ resolve, reject }) }) } - using _ = defer(() => cancel(sessionID)) + const g = state()[sessionID].generation + using _ = defer(() => cancel(sessionID, g)) // Structured output state // Note: On session resumption, state is reset but outputFormat is preserved diff --git a/packages/opencode/test/question/nested-question.test.ts b/packages/opencode/test/question/nested-question.test.ts new file mode 100644 index 000000000000..022df735b30a --- /dev/null +++ b/packages/opencode/test/question/nested-question.test.ts @@ -0,0 +1,234 @@ +import { test, expect } from "bun:test" +import { Question } from "../../src/question" +import { Instance } from "../../src/project/instance" +import { SessionID } from "../../src/session/schema" +import { tmpdir } from "../fixture/fixture" + +// Nested session question propagation tests +// Validates that Question.ask/list/reply work across parent/child session boundaries + +test("list - returns questions from multiple sessions (parent + child)", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // Parent session asks a question + Question.ask({ + sessionID: SessionID.make("ses_parent"), + questions: [ + { + question: "Parent question?", + header: "Parent", + options: [{ label: "A", description: "Option A" }], + }, + ], + }) + + // Child session asks a question + Question.ask({ + sessionID: SessionID.make("ses_child"), + questions: [ + { + question: "Child question?", + header: "Child", + options: [{ label: "B", description: "Option B" }], + }, + ], + }) + + // list() returns ALL pending questions regardless of session + const pending = await Question.list() + expect(pending.length).toBe(2) + const ids = pending.map((p) => p.sessionID) + expect(ids).toContain(SessionID.make("ses_parent")) + expect(ids).toContain(SessionID.make("ses_child")) + }, + }) +}) + +test("reply - can answer child session question from parent context", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // Child session asks + const ask = Question.ask({ + sessionID: SessionID.make("ses_child"), + questions: [ + { + question: "Child needs input", + header: "Input", + options: [{ label: "Yes", description: "Confirm" }], + }, + ], + }) + + // Parent context can see and reply + const pending = await Question.list() + expect(pending.length).toBe(1) + expect(pending[0].sessionID).toBe(SessionID.make("ses_child")) + + await Question.reply({ + requestID: pending[0].id, + answers: [["Yes"]], + }) + + const answers = await ask + expect(answers).toEqual([["Yes"]]) + }, + }) +}) + +test("reject - can reject child session question from parent context", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const ask = Question.ask({ + sessionID: SessionID.make("ses_child"), + questions: [ + { + question: "Child needs input", + header: "Input", + options: [{ label: "Yes", description: "Confirm" }], + }, + ], + }) + + const pending = await Question.list() + await Question.reject(pending[0].id) + + await expect(ask).rejects.toBeInstanceOf(Question.RejectedError) + }, + }) +}) + +test("list - returns questions from deeply nested sessions", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // Simulate 3 levels: parent -> child -> grandchild + Question.ask({ + sessionID: SessionID.make("ses_parent"), + questions: [ + { + question: "Level 0?", + header: "L0", + options: [{ label: "A", description: "A" }], + }, + ], + }) + + Question.ask({ + sessionID: SessionID.make("ses_child"), + questions: [ + { + question: "Level 1?", + header: "L1", + options: [{ label: "B", description: "B" }], + }, + ], + }) + + Question.ask({ + sessionID: SessionID.make("ses_grandchild"), + questions: [ + { + question: "Level 2?", + header: "L2", + options: [{ label: "C", description: "C" }], + }, + ], + }) + + const pending = await Question.list() + expect(pending.length).toBe(3) + }, + }) +}) + +test("rejectSession - only rejects questions for the specified session", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const child = Question.ask({ + sessionID: SessionID.make("ses_child"), + questions: [ + { + question: "Child?", + header: "C", + options: [{ label: "X", description: "X" }], + }, + ], + }) + + Question.ask({ + sessionID: SessionID.make("ses_other"), + questions: [ + { + question: "Other?", + header: "O", + options: [{ label: "Y", description: "Y" }], + }, + ], + }) + + // Reject only child session + await Question.rejectSession(SessionID.make("ses_child")) + + await expect(child).rejects.toBeInstanceOf(Question.RejectedError) + + // Other session's question should still be pending + const pending = await Question.list() + expect(pending.length).toBe(1) + expect(pending[0].sessionID).toBe(SessionID.make("ses_other")) + }, + }) +}) + +test("reply - resolving parent question does not affect child questions", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const parent = Question.ask({ + sessionID: SessionID.make("ses_parent"), + questions: [ + { + question: "Parent?", + header: "P", + options: [{ label: "A", description: "A" }], + }, + ], + }) + + Question.ask({ + sessionID: SessionID.make("ses_child"), + questions: [ + { + question: "Child?", + header: "C", + options: [{ label: "B", description: "B" }], + }, + ], + }) + + // Reply to parent only + const pending = await Question.list() + const req = pending.find((p) => p.sessionID === SessionID.make("ses_parent"))! + await Question.reply({ + requestID: req.id, + answers: [["A"]], + }) + + await expect(parent).resolves.toEqual([["A"]]) + + // Child question should still be pending + const remaining = await Question.list() + expect(remaining.length).toBe(1) + expect(remaining[0].sessionID).toBe(SessionID.make("ses_child")) + }, + }) +}) diff --git a/packages/opencode/test/session/cancel.test.ts b/packages/opencode/test/session/cancel.test.ts new file mode 100644 index 000000000000..e9b70e8ddfbb --- /dev/null +++ b/packages/opencode/test/session/cancel.test.ts @@ -0,0 +1,489 @@ +import { describe, expect, spyOn, test } from "bun:test" +import { Bus } from "../../src/bus" +import { Instance } from "../../src/project/instance" +import { SessionActivity } from "../../src/session/activity" +import { SessionProcessor } from "../../src/session/processor" +import { SessionPrompt } from "../../src/session/prompt" +import { SessionID } from "../../src/session/schema" +import { SessionStatus } from "../../src/session/status" +import { Log } from "../../src/util/log" +import { tmpdir } from "../fixture/fixture" + +Log.init({ print: false }) + +describe("SessionPrompt cancel", () => { + test("Property 1: Generation Isolation — stale cancel does not abort new controller", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const id = SessionID.make("session_gen_iso") + + // First start — creates state with generation g1 + const signal1 = SessionPrompt.start(id) + expect(signal1).toBeDefined() + const g1 = SessionPrompt._state()[id].generation + + // Unconditional cancel to clear state + SessionPrompt.cancel(id) + expect(signal1!.aborted).toBe(true) + + // Second start — creates state with generation g2 > g1 + const signal2 = SessionPrompt.start(id) + expect(signal2).toBeDefined() + const g2 = SessionPrompt._state()[id].generation + expect(g2).toBeGreaterThan(g1) + + // Stale cancel with g1 — must NOT abort signal2 + SessionPrompt.cancel(id, g1) + expect(signal2!.aborted).toBe(false) + // State still exists + expect(SessionPrompt._state()[id]).toBeDefined() + + // Fresh cancel with no generation — aborts signal2 + SessionPrompt.cancel(id) + expect(signal2!.aborted).toBe(true) + }, + }) + }) + + test("Property 2: Pre-Cancel Completeness — cancel before start returns undefined", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const id = SessionID.make("session_precancel") + + // Cancel before any state exists + SessionPrompt.cancel(id) + expect(SessionPrompt._precancelled.has(id)).toBe(true) + + // Start should return undefined (pre-cancelled) and consume the entry + const signal = SessionPrompt.start(id) + expect(signal).toBeUndefined() + expect(SessionPrompt._precancelled.has(id)).toBe(false) + + // Subsequent start works normally (pre-cancel consumed) + const signal2 = SessionPrompt.start(id) + expect(signal2).toBeDefined() + + // Clean up + SessionPrompt.cancel(id) + }, + }) + }) + + test("Property 3: Cancel Idempotency — SessionActivity.remove called at most once", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const spy = spyOn(SessionActivity, "remove") + try { + const id = SessionID.make("session_idempotent") + + // Start a session + const signal = SessionPrompt.start(id) + expect(signal).toBeDefined() + + // Cancel three times + SessionPrompt.cancel(id) + SessionPrompt.cancel(id) + SessionPrompt.cancel(id) + + // remove called exactly once (first cancel hit state, subsequent are no-ops) + const calls = spy.mock.calls.filter((c) => c[0] === id) + expect(calls).toHaveLength(1) + } finally { + spy.mockRestore() + } + }, + }) + }) + + test("Property 4: Callback Completeness — all queued callbacks rejected on cancel", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const id = SessionID.make("session_callbacks") + + // Start session + SessionPrompt.start(id) + const entry = SessionPrompt._state()[id] + + // Queue 3 callbacks + const errors: unknown[] = [] + const promises = Array.from({ length: 3 }, () => + new Promise((resolve, reject) => { + entry.callbacks.push({ resolve, reject }) + }).catch((e) => { + errors.push(e) + }), + ) + + // Cancel — should reject all + SessionPrompt.cancel(id) + await Promise.all(promises) + + expect(errors).toHaveLength(3) + for (const err of errors) { + expect(SessionPrompt.SessionCancelledError.isInstance(err)).toBe(true) + } + }, + }) + }) + + test("Property 5: Activity Tracking — remove not called for no-op cancel", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const spy = spyOn(SessionActivity, "remove") + try { + const id = SessionID.make("session_nostate") + + // Cancel with no existing state — should NOT call remove + SessionPrompt.cancel(id) + const calls = spy.mock.calls.filter((c) => c[0] === id) + expect(calls).toHaveLength(0) + } finally { + spy.mockRestore() + } + }, + }) + }) + + test("Property 5b: Activity Tracking — remove not called for stale generation cancel", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const spy = spyOn(SessionActivity, "remove") + try { + const id = SessionID.make("session_stale_gen") + + SessionPrompt.start(id) + const g = SessionPrompt._state()[id].generation + + // Cancel with stale generation (gen - 1) — should NOT call remove + SessionPrompt.cancel(id, g - 1) + const calls = spy.mock.calls.filter((c) => c[0] === id) + expect(calls).toHaveLength(0) + + // State still exists + expect(SessionPrompt._state()[id]).toBeDefined() + + // Clean up + SessionPrompt.cancel(id) + } finally { + spy.mockRestore() + } + }, + }) + }) + + test("Kill Chain A: deferred cancel with stale gen does not abort new controller", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const id = SessionID.make("session_killchain_a") + + // Simulate loop N: start and capture generation + const signal1 = SessionPrompt.start(id) + expect(signal1).toBeDefined() + const g1 = SessionPrompt._state()[id].generation + + // Parent cancels the session (unconditional, as it would in production) + SessionPrompt.cancel(id) + expect(signal1!.aborted).toBe(true) + + // Retry starts loop N+1 + const signal2 = SessionPrompt.start(id) + expect(signal2).toBeDefined() + const g2 = SessionPrompt._state()[id].generation + expect(g2).toBeGreaterThan(g1) + + // Deferred cancel from loop N fires with stale g1 + SessionPrompt.cancel(id, g1) + + // Loop N+1's controller must NOT be aborted + expect(signal2!.aborted).toBe(false) + expect(SessionPrompt._state()[id]).toBeDefined() + expect(SessionPrompt._state()[id].generation).toBe(g2) + + // Clean up + SessionPrompt.cancel(id) + }, + }) + }) + + test("Kill Chain B: cancel before start prevents zombie session", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const id = SessionID.make("session_killchain_b") + + // Parent cancels child before child starts + SessionPrompt.cancel(id) + expect(SessionPrompt._precancelled.has(id)).toBe(true) + + // Child's start() consumes the pre-cancel — returns undefined + const signal = SessionPrompt.start(id) + expect(signal).toBeUndefined() + + // No state created — zombie prevented + expect(SessionPrompt._state()[id]).toBeUndefined() + expect(SessionPrompt._precancelled.has(id)).toBe(false) + }, + }) + }) + + test("Kill Chain B2: loop() rejects with SessionCancelledError when pre-cancelled", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const id = SessionID.make("session_loop_precancel") + + // Pre-cancel before any state exists + SessionPrompt.cancel(id) + expect(SessionPrompt._precancelled.has(id)).toBe(true) + + // loop() should reject with SessionCancelledError + try { + await SessionPrompt.loop({ sessionID: id }) + expect.unreachable("loop() should have thrown") + } catch (err) { + expect(SessionPrompt.SessionCancelledError.isInstance(err)).toBe(true) + } + }, + }) + }) + + test("pre-cancel is idempotent — multiple cancels before start", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const id = SessionID.make("session_precancel_idem") + + // Cancel multiple times before start + SessionPrompt.cancel(id) + SessionPrompt.cancel(id) + SessionPrompt.cancel(id) + + // Precancelled map has exactly one entry + expect(SessionPrompt._precancelled.has(id)).toBe(true) + + // Start consumes it + const signal = SessionPrompt.start(id) + expect(signal).toBeUndefined() + expect(SessionPrompt._precancelled.has(id)).toBe(false) + + // Next start works normally + const signal2 = SessionPrompt.start(id) + expect(signal2).toBeDefined() + + // Clean up + SessionPrompt.cancel(id) + }, + }) + }) + + test("cancel with generation on non-existent session is a no-op", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const spy = spyOn(SessionActivity, "remove") + try { + const id = SessionID.make("session_gen_nostate") + + // Cancel with a generation when no state exists — should be pure no-op + SessionPrompt.cancel(id, 42) + + // Must NOT add to precancelled + expect(SessionPrompt._precancelled.has(id)).toBe(false) + + // Must NOT call remove + const calls = spy.mock.calls.filter((c) => c[0] === id) + expect(calls).toHaveLength(0) + } finally { + spy.mockRestore() + } + }, + }) + }) + + test("no-op cancel still sets status to idle", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const spy = spyOn(SessionStatus, "set") + try { + const id = SessionID.make("session_noop_status") + + // Cancel with no state — should set status idle (not throw) + expect(() => SessionPrompt.cancel(id)).not.toThrow() + + // Verify SessionStatus.set was called with idle + const calls = spy.mock.calls.filter((c) => c[0] === id) + expect(calls).toHaveLength(1) + expect(calls[0][1]).toEqual({ type: "idle" }) + } finally { + spy.mockRestore() + } + }, + }) + }) + + test("callback rejection happens before abort", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const id = SessionID.make("session_cb_order") + + const signal = SessionPrompt.start(id) + expect(signal).toBeDefined() + + const order: string[] = [] + + // Listen for abort + signal!.addEventListener("abort", () => order.push("abort"), { + once: true, + }) + + // Add a callback that records when rejection happens + const entry = SessionPrompt._state()[id] + const done = new Promise((resolve) => { + entry.callbacks.push({ + resolve: () => {}, + reject: () => { + order.push("reject") + resolve() + }, + }) + }) + + SessionPrompt.cancel(id) + await done + + // Rejection must come before abort + expect(order[0]).toBe("reject") + expect(order[1]).toBe("abort") + }, + }) + }) + + test("start for already-running session returns undefined without modifying generation", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const id = SessionID.make("session_double_start") + + const signal = SessionPrompt.start(id) + expect(signal).toBeDefined() + const g = SessionPrompt._state()[id].generation + + // Second start returns undefined — already running + const signal2 = SessionPrompt.start(id) + expect(signal2).toBeUndefined() + + // Generation unchanged + expect(SessionPrompt._state()[id].generation).toBe(g) + + // Controller not affected + expect(signal!.aborted).toBe(false) + + // Clean up + SessionPrompt.cancel(id) + }, + }) + }) + + test("generations are strictly monotonic across sessions", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const ids = [ + SessionID.make("session_mono_a"), + SessionID.make("session_mono_b"), + SessionID.make("session_mono_c"), + ] + const gens: number[] = [] + + for (const id of ids) { + SessionPrompt.start(id) + gens.push(SessionPrompt._state()[id].generation) + } + + // Each generation strictly greater than the previous + for (let i = 1; i < gens.length; i++) { + expect(gens[i]).toBeGreaterThan(gens[i - 1]) + } + + // Clean up + for (const id of ids) SessionPrompt.cancel(id) + }, + }) + }) + + test("Property 7: CancelRequested event triggers cancel for active session", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const id = SessionID.make("session_cancel_propagation") + + // Wire up the subscription + SessionPrompt.init() + + // Start a session so there's state to cancel + const signal = SessionPrompt.start(id) + expect(signal).toBeDefined() + expect(signal!.aborted).toBe(false) + + // Publish a CancelRequested event (simulates abortChildren calling Bus.publish) + Bus.publish(SessionProcessor.Event.CancelRequested, { sessionID: id }) + + // Allow microtask for subscriber to fire + await new Promise((r) => setTimeout(r, 10)) + + // The session should now be cancelled + expect(signal!.aborted).toBe(true) + expect(SessionPrompt._state()[id]).toBeUndefined() + }, + }) + }) + + test("Property 7: CancelRequested event is no-op for nonexistent session", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const spy = spyOn(SessionActivity, "remove") + try { + SessionPrompt.init() + + // Publish for a session that doesn't exist + Bus.publish(SessionProcessor.Event.CancelRequested, { sessionID: SessionID.make("session_nonexistent") }) + + await new Promise((r) => setTimeout(r, 10)) + + // Should not have called remove (no state to clean up) + const calls = spy.mock.calls.filter((c) => c[0] === SessionID.make("session_nonexistent")) + expect(calls).toHaveLength(0) + } finally { + spy.mockRestore() + } + }, + }) + }) +})