From 76911fe8b39cf3d631cd310c1252f62061807fd5 Mon Sep 17 00:00:00 2001 From: Sami Jawhar Date: Thu, 2 Apr 2026 15:58:40 +0000 Subject: [PATCH] feat: memory telemetry, serve diagnostics, and memory leak port --- packages/opencode/src/acp/session.ts | 1 + packages/opencode/src/bus/index.ts | 13 +- packages/opencode/src/cli/cmd/acp.ts | 3 + packages/opencode/src/cli/cmd/serve.ts | 38 +++ packages/opencode/src/cli/cmd/tui/app.tsx | 4 + packages/opencode/src/cli/cmd/web.ts | 3 + packages/opencode/src/project/instance.ts | 5 + packages/opencode/src/pty/index.ts | 1 + packages/opencode/src/session/compaction.ts | 12 +- packages/opencode/src/session/index.ts | 8 +- packages/opencode/src/session/processor.ts | 14 +- packages/opencode/src/session/prompt.ts | 27 +- packages/opencode/src/session/summary.ts | 8 +- packages/opencode/src/share/share-next.ts | 3 +- packages/opencode/src/telemetry/memory.ts | 291 ++++++++++++++++++++ packages/opencode/src/telemetry/tracker.ts | 51 ++++ 16 files changed, 464 insertions(+), 18 deletions(-) create mode 100644 packages/opencode/src/telemetry/memory.ts create mode 100644 packages/opencode/src/telemetry/tracker.ts diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index b96ebc1c8952..afc399d08059 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -7,6 +7,7 @@ const log = Log.create({ service: "acp-session-manager" }) export class ACPSessionManager { private sessions = new Map() + get size() { return this.sessions.size } private sdk: OpencodeClient constructor(sdk: OpencodeClient) { diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index fe26a6672e03..f1892ac3da4f 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -9,6 +9,7 @@ import { makeRuntime } from "@/effect/run-service" export namespace Bus { const log = Log.create({ service: "bus" }) + const stats = { subscriptions: 0, unsubscriptions: 0 } export const InstanceDisposed = BusEvent.define( "server.instance.disposed", @@ -99,6 +100,7 @@ export namespace Bus { } function subscribe(def: D): Stream.Stream> { + stats.subscriptions++ log.info("subscribing", { type: def.type }) return Stream.unwrap( Effect.gen(function* () { @@ -106,21 +108,23 @@ export namespace Bus { const ps = yield* getOrCreate(s, def) return Stream.fromPubSub(ps) }), - ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type })))) + ).pipe(Stream.ensuring(Effect.sync(() => { stats.unsubscriptions++; log.info("unsubscribing", { type: def.type }) }))) } function subscribeAll(): Stream.Stream { + stats.subscriptions++ log.info("subscribing", { type: "*" }) return Stream.unwrap( Effect.gen(function* () { const s = yield* InstanceState.get(state) return Stream.fromPubSub(s.wildcard) }), - ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" })))) + ).pipe(Stream.ensuring(Effect.sync(() => { stats.unsubscriptions++; log.info("unsubscribing", { type: "*" }) }))) } function on(pubsub: PubSub.PubSub, type: string, callback: (event: T) => unknown) { return Effect.gen(function* () { + stats.subscriptions++ log.info("subscribing", { type }) const scope = yield* Scope.make() const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub)) @@ -140,6 +144,7 @@ export namespace Bus { ) return () => { + stats.unsubscriptions++ log.info("unsubscribing", { type }) Effect.runFork(Scope.close(scope, Exit.void)) } @@ -182,4 +187,8 @@ export namespace Bus { export function subscribeAll(callback: (event: any) => unknown) { return runSync((svc) => svc.subscribeAllCallback(callback)) } + + export function diagnostics() { + return { ...stats, active: stats.subscriptions - stats.unsubscriptions } + } } diff --git a/packages/opencode/src/cli/cmd/acp.ts b/packages/opencode/src/cli/cmd/acp.ts index 99a9a81ab9cd..b2727074be41 100644 --- a/packages/opencode/src/cli/cmd/acp.ts +++ b/packages/opencode/src/cli/cmd/acp.ts @@ -6,6 +6,7 @@ import { ACP } from "@/acp/agent" import { Server } from "@/server/server" import { createOpencodeClient } from "@opencode-ai/sdk/v2" import { withNetworkOptions, resolveNetworkOptions } from "../network" +import { registerSignals, start as startMemoryTelemetry } from "@/telemetry/memory" const log = Log.create({ service: "acp-command" }) @@ -24,6 +25,8 @@ export const AcpCommand = cmd({ await bootstrap(process.cwd(), async () => { const opts = await resolveNetworkOptions(args) const server = Server.listen(opts) + registerSignals() + startMemoryTelemetry() const sdk = createOpencodeClient({ baseUrl: `http://${server.hostname}:${server.port}`, diff --git a/packages/opencode/src/cli/cmd/serve.ts b/packages/opencode/src/cli/cmd/serve.ts index ab51fe8c3e3b..0855ce2bc40f 100644 --- a/packages/opencode/src/cli/cmd/serve.ts +++ b/packages/opencode/src/cli/cmd/serve.ts @@ -5,6 +5,12 @@ import { Flag } from "../../flag/flag" import { Workspace } from "../../control-plane/workspace" import { Project } from "../../project/project" import { Installation } from "../../installation" +import { registerSignals, start as startMemoryTelemetry, setGauges } from "@/telemetry/memory" +import { Instance } from "@/project/instance" +import { SessionPrompt } from "@/session/prompt" +import { Bus } from "@/bus" +import { gauges as trackerGauges } from "@/telemetry/tracker" +import * as fs from "node:fs" export const ServeCommand = cmd({ command: "serve", @@ -16,6 +22,38 @@ export const ServeCommand = cmd({ } const opts = await resolveNetworkOptions(args) const server = Server.listen(opts) + registerSignals() + startMemoryTelemetry() + setGauges(() => { + const prompt = SessionPrompt.stateSize() + const bus = Bus.diagnostics() + const dirs = Instance.directories() + // Count child processes and zombies on Linux + let children = 0 + let zombies = 0 + try { + const kids = fs.readFileSync(`/proc/${process.pid}/task/${process.pid}/children`, "utf8").trim() + if (kids) { + const pids = kids.split(" ") + children = pids.length + for (const pid of pids) { + try { + const stat = fs.readFileSync(`/proc/${pid}/stat`, "utf8") + if (stat.includes(") Z")) zombies++ + } catch {} + } + } + } catch {} + return { + instances: dirs.length, + prompt_sessions: prompt.sessions, + prompt_pending: prompt.pending, + bus_active: bus.active, + children, + zombies, + ...trackerGauges(), + } + }) console.log(`opencode server listening on http://${server.hostname}:${server.port}`) await new Promise(() => {}) diff --git a/packages/opencode/src/cli/cmd/tui/app.tsx b/packages/opencode/src/cli/cmd/tui/app.tsx index 93d1fc19ae2b..76426b9f780a 100644 --- a/packages/opencode/src/cli/cmd/tui/app.tsx +++ b/packages/opencode/src/cli/cmd/tui/app.tsx @@ -59,6 +59,7 @@ import { TuiConfigProvider, useTuiConfig } from "./context/tui-config" import { TuiConfig } from "@/config/tui" import { createTuiApi, TuiPluginRuntime, type RouteMap } from "./plugin" import { FormatError, FormatUnknownError } from "@/cli/error" +import { registerSignals, start as startMemoryTelemetry } from "@/telemetry/memory" async function getTerminalBackgroundColor(): Promise<"dark" | "light"> { // can't set raw mode if not a TTY @@ -172,6 +173,9 @@ export function tui(input: { }) { // promise to prevent immediate exit return new Promise(async (resolve) => { + registerSignals() + startMemoryTelemetry() + const unguard = win32InstallCtrlCGuard() win32DisableProcessedInput() diff --git a/packages/opencode/src/cli/cmd/web.ts b/packages/opencode/src/cli/cmd/web.ts index 0fe056f21f2f..3348f4816f48 100644 --- a/packages/opencode/src/cli/cmd/web.ts +++ b/packages/opencode/src/cli/cmd/web.ts @@ -5,6 +5,7 @@ import { withNetworkOptions, resolveNetworkOptions } from "../network" import { Flag } from "../../flag/flag" import open from "open" import { networkInterfaces } from "os" +import { registerSignals, start as startMemoryTelemetry } from "@/telemetry/memory" function getNetworkIPs() { const nets = networkInterfaces() @@ -38,6 +39,8 @@ export const WebCommand = cmd({ } const opts = await resolveNetworkOptions(args) const server = Server.listen(opts) + registerSignals() + startMemoryTelemetry() UI.empty() UI.println(UI.logo(" ")) UI.empty() diff --git a/packages/opencode/src/project/instance.ts b/packages/opencode/src/project/instance.ts index a0d6f2414a85..c524d53b9a00 100644 --- a/packages/opencode/src/project/instance.ts +++ b/packages/opencode/src/project/instance.ts @@ -172,4 +172,9 @@ export const Instance = { return disposal.all }, + + /** Diagnostic: return list of active instance directories */ + directories() { + return [...cache.keys()] + }, } diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index 72089d8441ea..7fe1f703d675 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -128,6 +128,7 @@ export namespace Pty { } catch {} } session.subscribers.clear() + session.buffer = "" } const state = yield* InstanceState.make( diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts index 3158393f1145..b8ab9ca530d9 100644 --- a/packages/opencode/src/session/compaction.ts +++ b/packages/opencode/src/session/compaction.ts @@ -15,6 +15,7 @@ import { Plugin } from "@/plugin" import { Config } from "@/config/config" import { NotFoundError } from "@/storage/db" import { ModelID, ProviderID } from "@/provider/schema" +import { track } from "@/telemetry/tracker" import { Effect, Layer, ServiceMap } from "effect" import { makeRuntime } from "@/effect/run-service" import { InstanceState } from "@/effect/instance-state" @@ -95,9 +96,12 @@ export namespace SessionCompaction { if (cfg.compaction?.prune === false) return log.info("pruning") - const msgs = yield* session - .messages({ sessionID: input.sessionID }) - .pipe(Effect.catchIf(NotFoundError.isInstance, () => Effect.succeed(undefined))) + const msgs = yield* Effect.promise(() => + track("compaction.messages", Session.messages({ sessionID: input.sessionID })).catch((err) => { + if (NotFoundError.isInstance(err)) return undefined + throw err + }), + ) if (!msgs) return let total = 0 @@ -131,6 +135,8 @@ export namespace SessionCompaction { for (const part of toPrune) { if (part.state.status === "completed") { part.state.time.compacted = Date.now() + part.state.output = "" + part.state.attachments = undefined yield* session.updatePart(part) } } diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts index 41fad1a9d483..0cc15d09e822 100644 --- a/packages/opencode/src/session/index.ts +++ b/packages/opencode/src/session/index.ts @@ -849,7 +849,13 @@ export namespace Session { } export const children = fn(SessionID.zod, (id) => runPromise((svc) => svc.children(id))) - export const remove = fn(SessionID.zod, (id) => runPromise((svc) => svc.remove(id))) + export const remove = fn(SessionID.zod, async (sessionID) => { + log.info("removing session", { sessionID, promptState: SessionPrompt.stateSize() }) + const result = await runPromise((svc) => svc.remove(sessionID)) + await SessionPrompt.cancel(sessionID).catch(() => {}) + log.info("session removed", { sessionID, promptState: SessionPrompt.stateSize() }) + return result + }) export async function updateMessage(msg: T): Promise { MessageV2.Info.parse(msg) return runPromise((svc) => svc.updateMessage(msg)) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index b1a1b8dbd3d1..8a21bb829f7f 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -14,6 +14,7 @@ import { isOverflow } from "./overflow" import { PartID } from "./schema" import type { SessionID } from "./schema" import { SessionRetry } from "./retry" +import { fire } from "@/telemetry/tracker" import { SessionStatus } from "./status" import { SessionSummary } from "./summary" import type { Provider } from "@/provider/provider" @@ -298,10 +299,15 @@ export namespace SessionProcessor { } ctx.snapshot = undefined } - SessionSummary.summarize({ - sessionID: ctx.sessionID, - messageID: ctx.assistantMessage.parentID, - }) + yield* Effect.sync(() => + fire( + "processor.summarize", + SessionSummary.summarize({ + sessionID: ctx.sessionID, + messageID: ctx.assistantMessage.parentID, + }), + ), + ) if ( !ctx.assistantMessage.summary && isOverflow({ cfg: yield* config.get(), tokens: usage.tokens, model: ctx.model }) diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 436847ed4e3c..39973100496b 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -47,6 +47,7 @@ import { AppFileSystem } from "@/filesystem" import { Truncate } from "@/tool/truncate" import { decodeDataUrl } from "@/util/data-url" import { Process } from "@/util/process" +import { fire, trackEffect } from "@/telemetry/tracker" import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect" import { InstanceState } from "@/effect/instance-state" import { makeRuntime } from "@/effect/run-service" @@ -70,6 +71,7 @@ export namespace SessionPrompt { export interface Interface { readonly assertNotBusy: (sessionID: SessionID) => Effect.Effect readonly cancel: (sessionID: SessionID) => Effect.Effect + readonly stateSize: () => Effect.Effect<{ sessions: number; ids: string[]; pending: number }> readonly prompt: (input: PromptInput) => Effect.Effect readonly loop: (input: z.infer) => Effect.Effect readonly shell: (input: ShellInput) => Effect.Effect @@ -152,6 +154,13 @@ export namespace SessionPrompt { yield* runner.cancel }) + const stateSize: Interface["stateSize"] = () => + InstanceState.use(state, (s) => ({ + sessions: s.runners.size, + ids: [...s.runners.keys()], + pending: [...s.runners.values()].filter((r) => r.busy).length, + })) + const resolvePromptParts = Effect.fn("SessionPrompt.resolvePromptParts")(function* (template: string) { const ctx = yield* InstanceState.context const parts: PromptInput["parts"] = [{ type: "text", text: template }] @@ -1345,7 +1354,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the yield* status.set(sessionID, { type: "busy" }) log.info("loop", { step, sessionID }) - let msgs = yield* MessageV2.filterCompactedEffect(sessionID) + let msgs = yield* trackEffect("prompt.messages", MessageV2.filterCompactedEffect(sessionID)) let lastUser: MessageV2.User | undefined let lastAssistant: MessageV2.Assistant | undefined @@ -1476,7 +1485,11 @@ NOTE: At any point in time through this workflow you should feel free to ask the }) } - if (step === 1) SessionSummary.summarize({ sessionID, messageID: lastUser.id }) + if (step === 1) { + yield* Effect.sync(() => + fire("prompt.summarize", SessionSummary.summarize({ sessionID, messageID: lastUser.id })), + ) + } if (step > 1 && lastFinished) { for (const m of msgs) { @@ -1699,6 +1712,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the return Service.of({ assertNotBusy, cancel, + stateSize, prompt, loop, shell, @@ -1708,7 +1722,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the }), ) - const defaultLayer = Layer.unwrap( + const baseLayer: unknown = Layer.unwrap( Effect.sync(() => layer.pipe( Layer.provide(SessionStatus.layer), @@ -1732,7 +1746,8 @@ NOTE: At any point in time through this workflow you should feel free to ask the ), ), ) - const { runPromise } = makeRuntime(Service, defaultLayer) + const defaultLayer = baseLayer as unknown as Layer.Layer + const { runPromise, runSync } = makeRuntime(Service, defaultLayer) export async function assertNotBusy(sessionID: SessionID) { return runPromise((svc) => svc.assertNotBusy(SessionID.zod.parse(sessionID))) @@ -1817,6 +1832,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the return runPromise((svc) => svc.cancel(SessionID.zod.parse(sessionID))) } + export function stateSize() { + return runSync((svc) => svc.stateSize()) + } + export const LoopInput = z.object({ sessionID: SessionID.zod, }) diff --git a/packages/opencode/src/session/summary.ts b/packages/opencode/src/session/summary.ts index d26a00d49339..e110244a4281 100644 --- a/packages/opencode/src/session/summary.ts +++ b/packages/opencode/src/session/summary.ts @@ -7,6 +7,7 @@ import { Storage } from "@/storage/storage" import { Session } from "." import { MessageV2 } from "./message-v2" import { SessionID, MessageID } from "./schema" +import { trackEffect } from "@/telemetry/tracker" export namespace SessionSummary { function unquoteGitPath(input: string) { @@ -107,7 +108,7 @@ export namespace SessionSummary { sessionID: SessionID messageID: MessageID }) { - const all = yield* sessions.messages({ sessionID: input.sessionID }) + const all = yield* trackEffect("summary.messages", sessions.messages({ sessionID: input.sessionID })) if (!all.length) return const diffs = yield* computeDiff({ messages: all }) @@ -163,8 +164,9 @@ export namespace SessionSummary { const { runPromise } = makeRuntime(Service, defaultLayer) - export const summarize = (input: { sessionID: SessionID; messageID: MessageID }) => - void runPromise((svc) => svc.summarize(input)).catch(() => {}) + export function summarize(input: { sessionID: SessionID; messageID: MessageID }) { + return runPromise((svc) => svc.summarize(input)).catch(() => {}) + } export const DiffInput = z.object({ sessionID: SessionID.zod, diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 2a11094f8074..9ac0eaf87633 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -4,6 +4,7 @@ import { Config } from "@/config/config" import { Provider } from "@/provider/provider" import { ProviderID, ModelID } from "@/provider/schema" import { Session } from "@/session" +import { track } from "@/telemetry/tracker" import type { SessionID } from "@/session/schema" import { MessageV2 } from "@/session/message-v2" import { Database, eq } from "@/storage/db" @@ -253,7 +254,7 @@ export namespace ShareNext { log.info("full sync", { sessionID }) const session = await Session.get(sessionID) const diffs = await Session.diff(sessionID) - const messages = await Array.fromAsync(MessageV2.stream(sessionID)) + const messages = await track("share.messages", Array.fromAsync(MessageV2.stream(sessionID))) const models = await Promise.all( Array.from( new Map( diff --git a/packages/opencode/src/telemetry/memory.ts b/packages/opencode/src/telemetry/memory.ts new file mode 100644 index 000000000000..f6af71e9bd3c --- /dev/null +++ b/packages/opencode/src/telemetry/memory.ts @@ -0,0 +1,291 @@ +import { Log } from "@/util/log" +import { Global } from "@/global" +import { heapStats } from "bun:jsc" +import * as fs from "node:fs" +import * as v8 from "node:v8" + +const log = Log.create({ service: "memory" }) +const step = 5_000 +const span = 12 +const limit = 50 * 1024 * 1024 +const spike = 500 * 1024 * 1024 +const gc = process.env.OPENCODE_FORCE_GC === "1" +const tty = process.stderr.isTTY + +function emit(msg: string) { + if (!tty) process.stderr.write(msg + "\n") +} + +function shrink() { + if (typeof Bun !== "undefined" && typeof Bun.shrink === "function") Bun.shrink() +} + +type Smaps = { + anon: number + file: number + swap: number + shared: number +} + +type Snap = { + at: string + rss: number + rss_proc?: number + heap_total: number + heap_used: number + count: number + types: Record + smaps?: Smaps +} + +let timer: ReturnType | undefined +let prev: Snap | undefined +let base: Snap | undefined +let tick = 0 +let busy = false +let usr1: (() => void) | undefined +let usr2: (() => void) | undefined + +let gauges: (() => Record) | undefined + +/** Register a function that returns app-level counters to include in every memory sample */ +export function setGauges(fn: () => Record) { + gauges = fn +} +function toMB(n: number) { + return Number((n / 1024 / 1024).toFixed(1)) +} + +function proc() { + if (process.platform !== "linux") return undefined + try { + const text = fs.readFileSync("/proc/self/status", "utf8") + const m = text.match(/^VmRSS:\s+(\d+)\s+kB$/m) + if (!m) return undefined + return Number(m[1]) * 1024 + } catch { + return undefined + } +} + +function smaps(): Smaps | undefined { + if (process.platform !== "linux") return undefined + try { + const text = fs.readFileSync("/proc/self/smaps_rollup", "utf8") + const get = (key: string) => { + const m = text.match(new RegExp(`^${key}:\\s+(\\d+)\\s+kB$`, "m")) + return m ? Number(m[1]) * 1024 : 0 + } + return { + anon: get("Pss_Anon"), + file: get("Pss_File"), + swap: get("Swap"), + shared: get("Shared_Clean") + get("Shared_Dirty"), + } + } catch { + return undefined + } +} + +function top(next: Record, last?: Record) { + return Object.entries(next) + .map(([name, count]) => { + return { + name, + delta: count - (last?.[name] ?? 0), + } + }) + .filter((row) => row.delta > 0) + .sort((a, b) => b.delta - a.delta) + .slice(0, 5) +} + +function take() { + if (gc) Bun.gc(false) + const mem = process.memoryUsage() + const stat = heapStats() + const rss_proc = proc() + const maps = smaps() + const types = Object.fromEntries(Object.entries(stat.objectTypeCounts).map(([k, v]) => [k, Number(v)])) + return { + at: new Date().toISOString(), + rss: mem.rss, + rss_proc, + heap_total: mem.heapTotal, + heap_used: mem.heapUsed, + count: Number(stat.objectCount), + types, + smaps: maps, + } satisfies Snap +} + +function snap(sig: string) { + if (busy) return + const rss = process.memoryUsage.rss() + const dir = Global.Path.log + try { + const stat = fs.statfsSync(dir) + const avail = stat.bavail * stat.bsize + if (avail < 512 * 1024 * 1024) { + const msg = `skip heap snapshot: ${dir} has ${toMB(avail)}MB free (need 512MB)` + log.warn(msg) + emit(msg) + return + } + } catch { + const msg = `skip heap snapshot: cannot check ${dir} disk space` + log.warn(msg) + emit(msg) + return + } + busy = true + try { + if (gc) Bun.gc(true) + const path = `${dir}/opencode-heap-${process.pid}-${Date.now()}.heapsnapshot` + log.info("heap snapshot started", { signal: sig, path, rss_mb: toMB(rss) }) + emit(`heap snapshot started: ${path} (signal: ${sig})`) + const file = v8.writeHeapSnapshot(path) + const msg = `heap snapshot written: ${file} (signal: ${sig})` + log.info(msg, { signal: sig, path: file }) + emit(msg) + } finally { + busy = false + } +} + +function smapsFmt(s?: Smaps) { + if (!s) return "" + return ` anon=${toMB(s.anon)}MB file=${toMB(s.file)}MB swap=${toMB(s.swap)}MB shared=${toMB(s.shared)}MB native=${toMB(s.anon - (process.memoryUsage().heapUsed))}MB` +} + +function smapsFields(s?: Smaps) { + if (!s) return {} + return { + anon_mb: toMB(s.anon), + file_mb: toMB(s.file), + swap_mb: toMB(s.swap), + shared_mb: toMB(s.shared), + native_mb: toMB(s.anon - process.memoryUsage().heapUsed), + } +} + +function reap() { + if (process.platform !== "linux") return + try { + // Reap zombie children — Bun doesn't always waitpid for spawned processes + const children = fs.readFileSync("/proc/self/task/" + process.pid + "/children", "utf8").trim() + if (!children) return + // Just reading /proc/self/stat of children triggers zombie collection in some cases. + // For actual reaping we'd need native waitpid, but Bun.spawnSync with a no-op works: + for (const pid of children.split(" ")) { + try { fs.readFileSync(`/proc/${pid}/stat`, "utf8") } catch {} + } + } catch {} +} + +function one(sig: string) { + const mem = process.memoryUsage() + const shot = take() + log.info("memory signal sample", { + signal: sig, + timestamp: shot.at, + rss_mb: toMB(mem.rss), + rss_proc_mb: shot.rss_proc ? toMB(shot.rss_proc) : undefined, + heap_mb: toMB(mem.heapUsed), + heap_total_mb: toMB(mem.heapTotal), + object_count: shot.count, + ...smapsFields(shot.smaps), + ...gauges?.(), + }) + const extra = gauges ? " " + Object.entries(gauges()).map(([k, v]) => `${k}=${v}`).join(" ") : "" + emit(`memory signal sample: rss=${toMB(mem.rss)}MB heap=${toMB(mem.heapUsed)}MB objects=${shot.count}${smapsFmt(shot.smaps)} signal=${sig}${extra}`) + shrink() +} + +export function registerSignals(): void { + if (usr1 && usr2) return + usr1 = () => { + snap("SIGUSR1") + } + usr2 = () => { + one("SIGUSR2") + } + process.on("SIGUSR1", usr1) + process.on("SIGUSR2", usr2) +} + +function sample() { + const next = take() + const last = prev + const first = base + const grows = top(next.types, last?.types) + const real = next.rss_proc ?? next.rss + const prev_real = last?.rss_proc ?? last?.rss ?? real + const delta = real - prev_real + const sum = tick % span === 0 + const grace = tick < span + const spiked = !grace && delta > spike + if ((!grace && delta > limit) || sum || spiked) { + log.info(sum ? "memory summary" : spiked ? "memory spike" : "memory growth", { + timestamp: next.at, + rss_mb: toMB(real), + rss_proc_mb: next.rss_proc ? toMB(next.rss_proc) : undefined, + heap_mb: toMB(next.heap_used), + heap_total_mb: toMB(next.heap_total), + object_count: next.count, + rss_delta_mb: toMB(delta), + rss_since_start_mb: first ? toMB(real - (first.rss_proc ?? first.rss)) : undefined, + ...smapsFields(next.smaps), + top_types: grows, + ...gauges?.(), + }) + const label = sum ? "summary" : spiked ? "spike" : "growth" + const extra = gauges ? " " + Object.entries(gauges()).map(([k, v]) => `${k}=${v}`).join(" ") : "" + emit(`memory ${label}: rss=${toMB(real)}MB delta=${toMB(delta)}MB heap=${toMB(next.heap_used)}MB${smapsFmt(next.smaps)}${extra}`) + // Auto heap snapshot on spikes (>500MB in one interval) + if (spiked && process.env.OPENCODE_AUTO_HEAP_SNAPSHOT === "1") { + snap("auto-spike") + } + } + // Periodically reap zombie children and release bmalloc hoarded memory. + // Bun.shrink() calls JSC::VM::shrinkFootprintWhenIdle() which triggers + // pas_scavenger_decommit_free_memory() to return freed segments to the OS. + // Without this, bmalloc retains large heap segments indefinitely (bun#28318). + if (tick % span === 0) { + reap() + shrink() + } + prev = next + if (!base) base = next + tick += 1 +} + +export function start(): void { + registerSignals() + if (timer) return + tick = 0 + prev = undefined + base = undefined + const first = take() + prev = first + base = first + log.info("memory baseline", { + timestamp: first.at, + rss_mb: toMB(first.rss), + rss_proc_mb: first.rss_proc ? toMB(first.rss_proc) : undefined, + heap_mb: toMB(first.heap_used), + heap_total_mb: toMB(first.heap_total), + object_count: first.count, + ...smapsFields(first.smaps), + top_types: top(first.types, {}), + }) + emit(`memory baseline: rss=${toMB(first.rss)}MB heap=${toMB(first.heap_used)}MB objects=${first.count}${smapsFmt(first.smaps)}`) + timer = setInterval(sample, step) + timer.unref?.() +} + +export function stop(): void { + if (!timer) return + clearInterval(timer) + timer = undefined +} diff --git a/packages/opencode/src/telemetry/tracker.ts b/packages/opencode/src/telemetry/tracker.ts new file mode 100644 index 000000000000..ebe1a8c2c10b --- /dev/null +++ b/packages/opencode/src/telemetry/tracker.ts @@ -0,0 +1,51 @@ +import { Log } from "@/util/log" +import { Effect } from "effect" + +const log = Log.create({ service: "tracker" }) +const active = new Map() + +export function track(label: string, promise: Promise): Promise { + const count = (active.get(label) ?? 0) + 1 + active.set(label, count) + return promise.finally(() => { + const next = (active.get(label) ?? 1) - 1 + if (next <= 0) active.delete(label) + else active.set(label, next) + }) +} + +export function trackEffect(label: string, effect: Effect.Effect): Effect.Effect { + const count = (active.get(label) ?? 0) + 1 + active.set(label, count) + return effect.pipe( + Effect.ensuring( + Effect.sync(() => { + const next = (active.get(label) ?? 1) - 1 + if (next <= 0) active.delete(label) + else active.set(label, next) + }), + ), + ) +} + +export function fire(label: string, promise: Promise) { + const count = (active.get(label) ?? 0) + 1 + active.set(label, count) + promise + .catch((e) => { + log.error("fire-and-forget rejected", { label, error: e instanceof Error ? e.message : String(e) }) + }) + .finally(() => { + const next = (active.get(label) ?? 1) - 1 + if (next <= 0) active.delete(label) + else active.set(label, next) + }) +} + +export function gauges(): Record { + const result: Record = {} + for (const [k, v] of active) { + result["track_" + k] = v + } + return result +}