Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion packages/opencode/src/permission/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export namespace PermissionNext {
readonly ask: (input: z.infer<typeof AskInput>) => Effect.Effect<void, Error>
readonly reply: (input: z.infer<typeof ReplyInput>) => Effect.Effect<void>
readonly list: () => Effect.Effect<Request[]>
readonly rejectSession: (sessionID: SessionID) => Effect.Effect<void>
}

interface PendingEntry {
Expand Down Expand Up @@ -239,7 +240,21 @@ export namespace PermissionNext {
return Array.from(pending.values(), (item) => item.info)
})

return Service.of({ ask, reply, list })
const rejectSession = Effect.fn("Permission.rejectSession")(function* (sessionID: SessionID) {
for (const [id, entry] of pending.entries()) {
if (entry.info.sessionID !== sessionID) continue
pending.delete(id)
log.info("rejecting for cancelled session", { requestID: id, sessionID })
Bus.publish(Event.Replied, {
sessionID: entry.info.sessionID,
requestID: entry.info.id,
reply: "reject",
})
yield* Deferred.fail(entry.deferred, new RejectedError())
}
})

return Service.of({ ask, reply, list, rejectSession })
}),
)

Expand Down Expand Up @@ -277,6 +292,10 @@ export namespace PermissionNext {
return runPromiseInstance(Service.use((svc) => svc.list()))
}

export async function rejectSession(sessionID: SessionID) {
return runPromiseInstance(Service.use((svc) => svc.rejectSession(sessionID)))
}

const EDIT_TOOLS = ["edit", "write", "apply_patch", "multiedit"]

export function disabled(tools: string[], ruleset: Ruleset): Set<string> {
Expand Down
20 changes: 19 additions & 1 deletion packages/opencode/src/question/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export namespace Question {
readonly reply: (input: { requestID: QuestionID; answers: Answer[] }) => Effect.Effect<void>
readonly reject: (requestID: QuestionID) => Effect.Effect<void>
readonly list: () => Effect.Effect<Request[]>
readonly rejectSession: (sessionID: SessionID) => Effect.Effect<void>
}

export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Question") {}
Expand Down Expand Up @@ -167,7 +168,20 @@ export namespace Question {
return Array.from(pending.values(), (x) => x.info)
})

return Service.of({ ask, reply, reject, list })
const rejectSession = Effect.fn("Question.rejectSession")(function* (sessionID: SessionID) {
for (const [id, entry] of pending.entries()) {
if (entry.info.sessionID !== sessionID) continue
pending.delete(id)
log.info("rejecting for cancelled session", { requestID: id, sessionID })
Bus.publish(Event.Rejected, {
sessionID: entry.info.sessionID,
requestID: entry.info.id,
})
yield* Deferred.fail(entry.deferred, new RejectedError())
}
})

return Service.of({ ask, reply, reject, list, rejectSession })
}),
)

Expand All @@ -190,4 +204,8 @@ export namespace Question {
export async function list(): Promise<Request[]> {
return runPromiseInstance(Service.use((svc) => svc.list()))
}

export async function rejectSession(sessionID: SessionID): Promise<void> {
return runPromiseInstance(Service.use((svc) => svc.rejectSession(sessionID)))
}
}
78 changes: 78 additions & 0 deletions packages/opencode/src/session/activity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Bus } from "@/bus"
import { Instance } from "@/project/instance"
import { MessageV2 } from "./message-v2"
import { Log } from "@/util/log"

const log = Log.create({ service: "session.activity" })

/**
* Tracks per-session last-activity timestamps via Bus events.
*
* Activity signals:
* - message.part.delta (token streaming — highest frequency)
* - message.part.updated (tool state changes, text completions)
*
* The watchdog queries `stale(sessionID, threshold)` to detect sessions
* that have had no activity for longer than the threshold, distinguishing
* "genuinely stuck / idle" from "actively streaming or executing tools".
*
* Root sessions (no parentID) are never subject to idle detection — only
* subagent sessions spawned by the task tool are monitored.
*/
export namespace SessionActivity {
const state = Instance.state(() => {
const data: Record<string, number> = {}
return data
})

/** Update the last-activity timestamp for a session. */
export function touch(id: string, now = Date.now()) {
state()[id] = now
}

/** Return the last-activity timestamp, or undefined if never recorded. */
export function last(id: string): number | undefined {
return state()[id]
}

/**
* True when the session has had no activity for longer than `threshold` ms.
* Returns false for sessions with no recorded activity (they haven't
* started yet, so they aren't stale).
*/
export function stale(id: string, threshold: number): boolean {
const ts = state()[id]
if (ts === undefined) return false
return Date.now() - ts > threshold
}

/** Remove tracking for a session (cleanup on cancel / completion). */
export function remove(id: string) {
delete state()[id]
}

/** Snapshot of all tracked sessions — for diagnostics / logging. */
export function list() {
return { ...state() }
}

/**
* Subscribe to Bus events that indicate session activity.
* Call once during bootstrap (idempotent per Instance lifecycle
* since Instance.state is scoped to the current instance).
*/
export function init() {
log.info("init")

// Token deltas — fires on every chunk from the LLM stream.
// This is the highest-frequency signal and acts as a natural heartbeat.
Bus.subscribe(MessageV2.Event.PartDelta, (evt) => {
touch(evt.properties.sessionID)
})

// Part state changes — tool start/complete/error, text start/end, etc.
Bus.subscribe(MessageV2.Event.PartUpdated, (evt) => {
touch(evt.properties.part.sessionID)
})
}
}
9 changes: 7 additions & 2 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Agent } from "@/agent/agent"
import { Snapshot } from "@/snapshot"
import { SessionSummary } from "./summary"
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { SessionRetry } from "./retry"
import { SessionStatus } from "./status"
import { Plugin } from "@/plugin"
Expand All @@ -14,10 +15,14 @@ import { Config } from "@/config/config"
import { SessionCompaction } from "./compaction"
import { PermissionNext } from "@/permission"
import { Question } from "@/question"
import { PartID } from "./schema"
import type { SessionID, MessageID } from "./schema"
import { PartID, SessionID } from "./schema"
import type { MessageID } from "./schema"
import z from "zod"

export namespace SessionProcessor {
export const Event = {
CancelRequested: BusEvent.define("session.prompt.cancel", z.object({ sessionID: SessionID.zod })),
}
const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })

Expand Down
59 changes: 49 additions & 10 deletions packages/opencode/src/session/prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import path from "path"
import os from "os"
import fs from "fs/promises"
import z from "zod"
import { SessionActivity } from "./activity"
import { Filesystem } from "../util/filesystem"
import { SessionID, MessageID, PartID } from "./schema"
import { MessageV2 } from "./message-v2"
Expand Down Expand Up @@ -42,6 +43,7 @@ import { SessionProcessor } from "./processor"
import { TaskTool } from "@/tool/task"
import { Tool } from "@/tool/tool"
import { PermissionNext } from "@/permission"
import { Question } from "@/question"
import { SessionStatus } from "./status"
import { LLM } from "./llm"
import { iife } from "@/util/iife"
Expand Down Expand Up @@ -76,6 +78,7 @@ export namespace SessionPrompt {
resolve(input: MessageV2.WithParts): void
reject(reason?: any): void
}[]
generation: number
}
> = {}
return data
Expand All @@ -87,6 +90,26 @@ export namespace SessionPrompt {
},
)


let gen = 0
const precancelled = new Map<SessionID, number>()

export const SessionCancelledError = NamedError.create(
"SessionCancelledError",
z.object({ sessionID: SessionID.zod }),
)

/** @internal Exported for testing */
export const _state = state
/** @internal Exported for testing */
export const _precancelled = precancelled

export function init() {
const log = Log.create({ service: "session.prompt" })
log.info("init")

Bus.subscribe(SessionProcessor.Event.CancelRequested, (evt) => cancel(evt.properties.sessionID))
}
export function assertNotBusy(sessionID: SessionID) {
const match = state()[sessionID]
if (match) throw new Session.BusyError(sessionID)
Expand Down Expand Up @@ -239,13 +262,20 @@ export namespace SessionPrompt {
return parts
}

function start(sessionID: SessionID) {
/** @internal Exported for testing */
export function start(sessionID: SessionID) {
const s = state()
if (s[sessionID]) return
if (precancelled.has(sessionID)) {
precancelled.delete(sessionID)
return
}
const controller = new AbortController()
gen++
s[sessionID] = {
abort: controller,
callbacks: [],
generation: gen,
}
return controller.signal
}
Expand All @@ -257,18 +287,25 @@ export namespace SessionPrompt {
return s[sessionID].abort.signal
}

export function cancel(sessionID: SessionID) {
log.info("cancel", { sessionID })
export function cancel(sessionID: SessionID, generation?: number) {
log.info("cancel", { sessionID, generation })
const s = state()
const match = s[sessionID]
if (!match) {
if (match) {
if (generation !== undefined && generation !== match.generation) return
for (const cb of match.callbacks) cb.reject(new SessionCancelledError({ sessionID }))
match.abort.abort()
SessionActivity.remove(sessionID)
delete s[sessionID]
// Reject any pending permission/question promises so tool calls unblock
PermissionNext.rejectSession(sessionID).catch(() => {})
Question.rejectSession(sessionID).catch(() => {})
SessionStatus.set(sessionID, { type: "idle" })
return
}
match.abort.abort()
delete s[sessionID]
if (generation !== undefined) return
precancelled.set(sessionID, Date.now())
SessionStatus.set(sessionID, { type: "idle" })
return
}

export const LoopInput = z.object({
Expand All @@ -280,13 +317,15 @@ export namespace SessionPrompt {

const abort = resume_existing ? resume(sessionID) : start(sessionID)
if (!abort) {
const entry = state()[sessionID]
if (!entry) throw new SessionCancelledError({ sessionID })
return new Promise<MessageV2.WithParts>((resolve, reject) => {
const callbacks = state()[sessionID].callbacks
callbacks.push({ resolve, reject })
entry.callbacks.push({ resolve, reject })
})
}

using _ = defer(() => cancel(sessionID))
const g = state()[sessionID].generation
using _ = defer(() => cancel(sessionID, g))

// Structured output state
// Note: On session resumption, state is reset but outputFormat is preserved
Expand Down
Loading
Loading