Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/opencode/src/acp/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const log = Log.create({ service: "acp-session-manager" })

export class ACPSessionManager {
private sessions = new Map<string, ACPSessionState>()
get size() { return this.sessions.size }
private sdk: OpencodeClient

constructor(sdk: OpencodeClient) {
Expand Down
13 changes: 11 additions & 2 deletions packages/opencode/src/bus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { makeRuntime } from "@/effect/run-service"

export namespace Bus {
const log = Log.create({ service: "bus" })
const stats = { subscriptions: 0, unsubscriptions: 0 }

export const InstanceDisposed = BusEvent.define(
"server.instance.disposed",
Expand Down Expand Up @@ -99,28 +100,31 @@ export namespace Bus {
}

function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
stats.subscriptions++
log.info("subscribing", { type: def.type })
return Stream.unwrap(
Effect.gen(function* () {
const s = yield* InstanceState.get(state)
const ps = yield* getOrCreate(s, def)
return Stream.fromPubSub(ps)
}),
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
).pipe(Stream.ensuring(Effect.sync(() => { stats.unsubscriptions++; log.info("unsubscribing", { type: def.type }) })))
}

function subscribeAll(): Stream.Stream<Payload> {
stats.subscriptions++
log.info("subscribing", { type: "*" })
return Stream.unwrap(
Effect.gen(function* () {
const s = yield* InstanceState.get(state)
return Stream.fromPubSub(s.wildcard)
}),
).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))))
).pipe(Stream.ensuring(Effect.sync(() => { stats.unsubscriptions++; log.info("unsubscribing", { type: "*" }) })))
}

function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
return Effect.gen(function* () {
stats.subscriptions++
log.info("subscribing", { type })
const scope = yield* Scope.make()
const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
Expand All @@ -140,6 +144,7 @@ export namespace Bus {
)

return () => {
stats.unsubscriptions++
log.info("unsubscribing", { type })
Effect.runFork(Scope.close(scope, Exit.void))
}
Expand Down Expand Up @@ -182,4 +187,8 @@ export namespace Bus {
export function subscribeAll(callback: (event: any) => unknown) {
return runSync((svc) => svc.subscribeAllCallback(callback))
}

export function diagnostics() {
return { ...stats, active: stats.subscriptions - stats.unsubscriptions }
}
}
3 changes: 3 additions & 0 deletions packages/opencode/src/cli/cmd/acp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { ACP } from "@/acp/agent"
import { Server } from "@/server/server"
import { createOpencodeClient } from "@opencode-ai/sdk/v2"
import { withNetworkOptions, resolveNetworkOptions } from "../network"
import { registerSignals, start as startMemoryTelemetry } from "@/telemetry/memory"

const log = Log.create({ service: "acp-command" })

Expand All @@ -24,6 +25,8 @@ export const AcpCommand = cmd({
await bootstrap(process.cwd(), async () => {
const opts = await resolveNetworkOptions(args)
const server = Server.listen(opts)
registerSignals()
startMemoryTelemetry()

const sdk = createOpencodeClient({
baseUrl: `http://${server.hostname}:${server.port}`,
Expand Down
38 changes: 38 additions & 0 deletions packages/opencode/src/cli/cmd/serve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import { Flag } from "../../flag/flag"
import { Workspace } from "../../control-plane/workspace"
import { Project } from "../../project/project"
import { Installation } from "../../installation"
import { registerSignals, start as startMemoryTelemetry, setGauges } from "@/telemetry/memory"
import { Instance } from "@/project/instance"
import { SessionPrompt } from "@/session/prompt"
import { Bus } from "@/bus"
import { gauges as trackerGauges } from "@/telemetry/tracker"
import * as fs from "node:fs"

export const ServeCommand = cmd({
command: "serve",
Expand All @@ -16,6 +22,38 @@ export const ServeCommand = cmd({
}
const opts = await resolveNetworkOptions(args)
const server = Server.listen(opts)
registerSignals()
startMemoryTelemetry()
setGauges(() => {
const prompt = SessionPrompt.stateSize()
const bus = Bus.diagnostics()
const dirs = Instance.directories()
// Count child processes and zombies on Linux
let children = 0
let zombies = 0
try {
const kids = fs.readFileSync(`/proc/${process.pid}/task/${process.pid}/children`, "utf8").trim()
if (kids) {
const pids = kids.split(" ")
children = pids.length
for (const pid of pids) {
try {
const stat = fs.readFileSync(`/proc/${pid}/stat`, "utf8")
if (stat.includes(") Z")) zombies++
} catch {}
}
}
} catch {}
return {
instances: dirs.length,
prompt_sessions: prompt.sessions,
prompt_pending: prompt.pending,
bus_active: bus.active,
children,
zombies,
...trackerGauges(),
}
})
console.log(`opencode server listening on http://${server.hostname}:${server.port}`)

await new Promise(() => {})
Expand Down
4 changes: 4 additions & 0 deletions packages/opencode/src/cli/cmd/tui/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import { TuiConfigProvider, useTuiConfig } from "./context/tui-config"
import { TuiConfig } from "@/config/tui"
import { createTuiApi, TuiPluginRuntime, type RouteMap } from "./plugin"
import { FormatError, FormatUnknownError } from "@/cli/error"
import { registerSignals, start as startMemoryTelemetry } from "@/telemetry/memory"

async function getTerminalBackgroundColor(): Promise<"dark" | "light"> {
// can't set raw mode if not a TTY
Expand Down Expand Up @@ -172,6 +173,9 @@ export function tui(input: {
}) {
// promise to prevent immediate exit
return new Promise<void>(async (resolve) => {
registerSignals()
startMemoryTelemetry()

const unguard = win32InstallCtrlCGuard()
win32DisableProcessedInput()

Expand Down
3 changes: 3 additions & 0 deletions packages/opencode/src/cli/cmd/web.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { withNetworkOptions, resolveNetworkOptions } from "../network"
import { Flag } from "../../flag/flag"
import open from "open"
import { networkInterfaces } from "os"
import { registerSignals, start as startMemoryTelemetry } from "@/telemetry/memory"

function getNetworkIPs() {
const nets = networkInterfaces()
Expand Down Expand Up @@ -38,6 +39,8 @@ export const WebCommand = cmd({
}
const opts = await resolveNetworkOptions(args)
const server = Server.listen(opts)
registerSignals()
startMemoryTelemetry()
UI.empty()
UI.println(UI.logo(" "))
UI.empty()
Expand Down
5 changes: 5 additions & 0 deletions packages/opencode/src/project/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,9 @@ export const Instance = {

return disposal.all
},

/** Diagnostic: return list of active instance directories */
directories() {
return [...cache.keys()]
},
}
1 change: 1 addition & 0 deletions packages/opencode/src/pty/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export namespace Pty {
} catch {}
}
session.subscribers.clear()
session.buffer = ""
}

const state = yield* InstanceState.make<State>(
Expand Down
12 changes: 9 additions & 3 deletions packages/opencode/src/session/compaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { Plugin } from "@/plugin"
import { Config } from "@/config/config"
import { NotFoundError } from "@/storage/db"
import { ModelID, ProviderID } from "@/provider/schema"
import { track } from "@/telemetry/tracker"
import { Effect, Layer, ServiceMap } from "effect"
import { makeRuntime } from "@/effect/run-service"
import { InstanceState } from "@/effect/instance-state"
Expand Down Expand Up @@ -95,9 +96,12 @@ export namespace SessionCompaction {
if (cfg.compaction?.prune === false) return
log.info("pruning")

const msgs = yield* session
.messages({ sessionID: input.sessionID })
.pipe(Effect.catchIf(NotFoundError.isInstance, () => Effect.succeed(undefined)))
const msgs = yield* Effect.promise(() =>
track("compaction.messages", Session.messages({ sessionID: input.sessionID })).catch((err) => {
if (NotFoundError.isInstance(err)) return undefined
throw err
}),
)
if (!msgs) return

let total = 0
Expand Down Expand Up @@ -131,6 +135,8 @@ export namespace SessionCompaction {
for (const part of toPrune) {
if (part.state.status === "completed") {
part.state.time.compacted = Date.now()
part.state.output = ""
part.state.attachments = undefined
yield* session.updatePart(part)
}
}
Expand Down
8 changes: 7 additions & 1 deletion packages/opencode/src/session/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,13 @@ export namespace Session {
}

export const children = fn(SessionID.zod, (id) => runPromise((svc) => svc.children(id)))
export const remove = fn(SessionID.zod, (id) => runPromise((svc) => svc.remove(id)))
export const remove = fn(SessionID.zod, async (sessionID) => {
log.info("removing session", { sessionID, promptState: SessionPrompt.stateSize() })
const result = await runPromise((svc) => svc.remove(sessionID))
await SessionPrompt.cancel(sessionID).catch(() => {})
log.info("session removed", { sessionID, promptState: SessionPrompt.stateSize() })
return result
})
export async function updateMessage<T extends MessageV2.Info>(msg: T): Promise<T> {
MessageV2.Info.parse(msg)
return runPromise((svc) => svc.updateMessage(msg))
Expand Down
14 changes: 10 additions & 4 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { isOverflow } from "./overflow"
import { PartID } from "./schema"
import type { SessionID } from "./schema"
import { SessionRetry } from "./retry"
import { fire } from "@/telemetry/tracker"
import { SessionStatus } from "./status"
import { SessionSummary } from "./summary"
import type { Provider } from "@/provider/provider"
Expand Down Expand Up @@ -298,10 +299,15 @@ export namespace SessionProcessor {
}
ctx.snapshot = undefined
}
SessionSummary.summarize({
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.parentID,
})
yield* Effect.sync(() =>
fire(
"processor.summarize",
SessionSummary.summarize({
sessionID: ctx.sessionID,
messageID: ctx.assistantMessage.parentID,
}),
),
)
if (
!ctx.assistantMessage.summary &&
isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model })
Expand Down
27 changes: 23 additions & 4 deletions packages/opencode/src/session/prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import { AppFileSystem } from "@/filesystem"
import { Truncate } from "@/tool/truncate"
import { decodeDataUrl } from "@/util/data-url"
import { Process } from "@/util/process"
import { fire, trackEffect } from "@/telemetry/tracker"
import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
Expand All @@ -70,6 +71,7 @@ export namespace SessionPrompt {
export interface Interface {
readonly assertNotBusy: (sessionID: SessionID) => Effect.Effect<void, Session.BusyError>
readonly cancel: (sessionID: SessionID) => Effect.Effect<void>
readonly stateSize: () => Effect.Effect<{ sessions: number; ids: string[]; pending: number }>
readonly prompt: (input: PromptInput) => Effect.Effect<MessageV2.WithParts>
readonly loop: (input: z.infer<typeof LoopInput>) => Effect.Effect<MessageV2.WithParts>
readonly shell: (input: ShellInput) => Effect.Effect<MessageV2.WithParts>
Expand Down Expand Up @@ -152,6 +154,13 @@ export namespace SessionPrompt {
yield* runner.cancel
})

const stateSize: Interface["stateSize"] = () =>
InstanceState.use(state, (s) => ({
sessions: s.runners.size,
ids: [...s.runners.keys()],
pending: [...s.runners.values()].filter((r) => r.busy).length,
}))

const resolvePromptParts = Effect.fn("SessionPrompt.resolvePromptParts")(function* (template: string) {
const ctx = yield* InstanceState.context
const parts: PromptInput["parts"] = [{ type: "text", text: template }]
Expand Down Expand Up @@ -1345,7 +1354,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
yield* status.set(sessionID, { type: "busy" })
log.info("loop", { step, sessionID })

let msgs = yield* MessageV2.filterCompactedEffect(sessionID)
let msgs = yield* trackEffect("prompt.messages", MessageV2.filterCompactedEffect(sessionID))

let lastUser: MessageV2.User | undefined
let lastAssistant: MessageV2.Assistant | undefined
Expand Down Expand Up @@ -1476,7 +1485,11 @@ NOTE: At any point in time through this workflow you should feel free to ask the
})
}

if (step === 1) SessionSummary.summarize({ sessionID, messageID: lastUser.id })
if (step === 1) {
yield* Effect.sync(() =>
fire("prompt.summarize", SessionSummary.summarize({ sessionID, messageID: lastUser.id })),
)
}

if (step > 1 && lastFinished) {
for (const m of msgs) {
Expand Down Expand Up @@ -1699,6 +1712,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
return Service.of({
assertNotBusy,
cancel,
stateSize,
prompt,
loop,
shell,
Expand All @@ -1708,7 +1722,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
}),
)

const defaultLayer = Layer.unwrap(
const baseLayer: unknown = Layer.unwrap(
Effect.sync(() =>
layer.pipe(
Layer.provide(SessionStatus.layer),
Expand All @@ -1732,7 +1746,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the
),
),
)
const { runPromise } = makeRuntime(Service, defaultLayer)
const defaultLayer = baseLayer as unknown as Layer.Layer<Service>
const { runPromise, runSync } = makeRuntime(Service, defaultLayer)

export async function assertNotBusy(sessionID: SessionID) {
return runPromise((svc) => svc.assertNotBusy(SessionID.zod.parse(sessionID)))
Expand Down Expand Up @@ -1817,6 +1832,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the
return runPromise((svc) => svc.cancel(SessionID.zod.parse(sessionID)))
}

export function stateSize() {
return runSync((svc) => svc.stateSize())
}

export const LoopInput = z.object({
sessionID: SessionID.zod,
})
Expand Down
8 changes: 5 additions & 3 deletions packages/opencode/src/session/summary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Storage } from "@/storage/storage"
import { Session } from "."
import { MessageV2 } from "./message-v2"
import { SessionID, MessageID } from "./schema"
import { trackEffect } from "@/telemetry/tracker"

export namespace SessionSummary {
function unquoteGitPath(input: string) {
Expand Down Expand Up @@ -107,7 +108,7 @@ export namespace SessionSummary {
sessionID: SessionID
messageID: MessageID
}) {
const all = yield* sessions.messages({ sessionID: input.sessionID })
const all = yield* trackEffect("summary.messages", sessions.messages({ sessionID: input.sessionID }))
if (!all.length) return

const diffs = yield* computeDiff({ messages: all })
Expand Down Expand Up @@ -163,8 +164,9 @@ export namespace SessionSummary {

const { runPromise } = makeRuntime(Service, defaultLayer)

export const summarize = (input: { sessionID: SessionID; messageID: MessageID }) =>
void runPromise((svc) => svc.summarize(input)).catch(() => {})
export function summarize(input: { sessionID: SessionID; messageID: MessageID }) {
return runPromise((svc) => svc.summarize(input)).catch(() => {})
}

export const DiffInput = z.object({
sessionID: SessionID.zod,
Expand Down
Loading
Loading