From 21048d438ac1e61e157f1e5c6cd61afc8e54405b Mon Sep 17 00:00:00 2001 From: Andres Caicedo Date: Sat, 28 Mar 2026 21:18:50 -0500 Subject: [PATCH 1/2] fix: harden SSE replay and backpressure handling --- packages/opencode/src/bus/bus-event.ts | 18 ++ packages/opencode/src/flag/flag.ts | 1 + packages/opencode/src/server/routes/event.ts | 129 +++++++++++++- packages/opencode/src/util/queue.ts | 26 ++- .../test/server/event-subscribe.test.ts | 157 ++++++++++++++++++ 5 files changed, 323 insertions(+), 8 deletions(-) create mode 100644 packages/opencode/test/server/event-subscribe.test.ts diff --git a/packages/opencode/src/bus/bus-event.ts b/packages/opencode/src/bus/bus-event.ts index d97922290eb2..53b4a8f6935c 100644 --- a/packages/opencode/src/bus/bus-event.ts +++ b/packages/opencode/src/bus/bus-event.ts @@ -6,6 +6,24 @@ export namespace BusEvent { const registry = new Map() + export const StreamLagged = define( + "server.stream.lagged", + z.object({ + limit: z.number(), + queued: z.number(), + dropped: z.number(), + }), + ) + + export const StreamExpired = define( + "server.stream.expired", + z.object({ + next: z.number(), + oldest: z.number(), + latest: z.number(), + }), + ) + export function define(type: Type, properties: Properties) { const result = { type, diff --git a/packages/opencode/src/flag/flag.ts b/packages/opencode/src/flag/flag.ts index 27190f2eb24e..3d28519fb07b 100644 --- a/packages/opencode/src/flag/flag.ts +++ b/packages/opencode/src/flag/flag.ts @@ -62,6 +62,7 @@ export namespace Flag { truthy("OPENCODE_ENABLE_EXA") || OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_EXA") export const OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS = number("OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS") export const OPENCODE_EXPERIMENTAL_OUTPUT_TOKEN_MAX = number("OPENCODE_EXPERIMENTAL_OUTPUT_TOKEN_MAX") + export const OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX = number("OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX") export const OPENCODE_EXPERIMENTAL_OXFMT = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_OXFMT") export const OPENCODE_EXPERIMENTAL_LSP_TY = truthy("OPENCODE_EXPERIMENTAL_LSP_TY") export const OPENCODE_EXPERIMENTAL_LSP_TOOL = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_LSP_TOOL") diff --git a/packages/opencode/src/server/routes/event.ts b/packages/opencode/src/server/routes/event.ts index 989b857710d2..900f8fa81dbb 100644 --- a/packages/opencode/src/server/routes/event.ts +++ b/packages/opencode/src/server/routes/event.ts @@ -5,9 +5,50 @@ import { Log } from "@/util/log" import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" import { AsyncQueue } from "../../util/queue" +import { Database, and, asc, gt, lte, sql } from "@/storage/db" +import { EventTable } from "@/sync/event.sql" +import { EventID } from "@/sync/schema" +import { Flag } from "@/flag/flag" const log = Log.create({ service: "server" }) +function max() { + return Flag.OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX ?? 1000 +} + +function parse(input?: string) { + if (!input) return + const value = Number(input) + if (!Number.isSafeInteger(value)) return + if (value < 0) return + return value +} + +function replay(next: number) { + return Database.use((db) => { + const oldest = db.select({ seq: sql`min(${EventTable.seq})` }).from(EventTable).get()?.seq ?? 0 + const latest = db.select({ seq: sql`max(${EventTable.seq})` }).from(EventTable).get()?.seq ?? 0 + if (next < oldest) { + return { + rows: [] as Array<{ id: string; type: string; data: Record }>, + oldest, + latest, + } + } + const rows = db + .select({ + id: EventTable.id, + type: EventTable.type, + data: EventTable.data, + }) + .from(EventTable) + .where(and(gt(EventTable.seq, next), lte(EventTable.seq, latest))) + .orderBy(asc(EventTable.seq)) + .all() + return { rows, oldest, latest } + }) +} + export const EventRoutes = () => new Hono().get( "/event", @@ -27,24 +68,96 @@ export const EventRoutes = () => }, }), async (c) => { + const raw = c.req.query("after_seq") + const after = parse(raw ?? undefined) log.info("event connected") c.header("Cache-Control", "no-cache, no-transform") c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - const q = new AsyncQueue() + if (raw !== undefined) { + await stream.writeSSE({ + data: JSON.stringify({ + type: "server.connected", + properties: {}, + }), + }) + + if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES || after === undefined) { + await stream.writeSSE({ + data: JSON.stringify({ + type: BusEvent.StreamExpired.type, + properties: { + next: after ?? -1, + oldest: 0, + latest: 0, + }, + }), + }) + return + } + + const data = replay(after) + if (after < data.oldest) { + await stream.writeSSE({ + data: JSON.stringify({ + type: BusEvent.StreamExpired.type, + properties: { + next: after, + oldest: data.oldest, + latest: data.latest, + }, + }), + }) + return + } + + for (const row of data.rows) { + await stream.writeSSE({ + data: JSON.stringify({ + id: row.id, + type: row.type, + properties: row.data, + }), + }) + } + return + } + + const limit = max() + const q = new AsyncQueue({ max: limit }) let done = false + let dropped = 0 + + function push(data: string, input?: { force?: boolean }) { + if (q.push(data, input)) return + dropped++ + q.clear() + q.push( + JSON.stringify({ + type: BusEvent.StreamLagged.type, + properties: { + limit, + queued: q.size(), + dropped, + }, + }), + { force: true }, + ) + q.push(null, { force: true }) + } - q.push( + push( JSON.stringify({ type: "server.connected", properties: {}, }), + { force: true }, ) // Send heartbeat every 10s to prevent stalled proxy streams. const heartbeat = setInterval(() => { - q.push( + push( JSON.stringify({ type: "server.heartbeat", properties: {}, @@ -57,12 +170,18 @@ export const EventRoutes = () => done = true clearInterval(heartbeat) unsub() - q.push(null) + q.push(null, { force: true }) log.info("event disconnected") } const unsub = Bus.subscribeAll((event) => { - q.push(JSON.stringify(event)) + const id = EventID.ascending() + push( + JSON.stringify({ + id, + ...event, + }), + ) if (event.type === Bus.InstanceDisposed.type) { stop() } diff --git a/packages/opencode/src/util/queue.ts b/packages/opencode/src/util/queue.ts index a1af53fe8f09..6899485f638f 100644 --- a/packages/opencode/src/util/queue.ts +++ b/packages/opencode/src/util/queue.ts @@ -1,11 +1,23 @@ export class AsyncQueue implements AsyncIterable { private queue: T[] = [] private resolvers: ((value: T) => void)[] = [] + private max: number - push(item: T) { + constructor(input?: { max?: number }) { + this.max = input?.max ?? Number.POSITIVE_INFINITY + } + + push(item: T, input?: { force?: boolean }) { const resolve = this.resolvers.shift() - if (resolve) resolve(item) - else this.queue.push(item) + if (resolve) { + resolve(item) + return true + } + if (!input?.force && this.queue.length >= this.max) { + return false + } + this.queue.push(item) + return true } async next(): Promise { @@ -13,9 +25,17 @@ export class AsyncQueue implements AsyncIterable { return new Promise((resolve) => this.resolvers.push(resolve)) } + clear() { + this.queue.length = 0 + } + async *[Symbol.asyncIterator]() { while (true) yield await this.next() } + + size() { + return this.queue.length + } } export async function work(concurrency: number, items: T[], fn: (item: T) => Promise) { diff --git a/packages/opencode/test/server/event-subscribe.test.ts b/packages/opencode/test/server/event-subscribe.test.ts new file mode 100644 index 000000000000..7d2a97ce4697 --- /dev/null +++ b/packages/opencode/test/server/event-subscribe.test.ts @@ -0,0 +1,157 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test" +import { tmpdir } from "../fixture/fixture" +import { resetDatabase } from "../fixture/db" +import { Instance } from "../../src/project/instance" +import { Server } from "../../src/server/server" +import { Session } from "../../src/session" +import { Flag } from "../../src/flag/flag" +import { Database, eq } from "../../src/storage/db" +import { EventTable } from "../../src/sync/event.sql" +import { Bus } from "../../src/bus" + +const workspaces = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES +const queue = Flag.OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX + +beforeEach(async () => { + await resetDatabase() +}) + +afterEach(() => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = workspaces + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX = queue +}) + +describe("server /event", () => { + test("returns stream.expired when replay is requested and workspaces are disabled", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = false + await Session.create({ title: "event-expired" }) + + const app = Server.Default() + const res = await app.request("/event?after_seq=0") + expect(res.status).toBe(200) + const body = await res.text() + expect(body).toContain("server.stream.expired") + }, + }) + }) + + test("returns stream.expired when replay cursor is older than oldest event", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true + const session = await Session.create({ title: "event-old-cursor" }) + await Session.setTitle({ sessionID: session.id, title: "event-old-cursor-2" }) + Database.use((db) => db.delete(EventTable).where(eq(EventTable.seq, 0)).run()) + + const app = Server.Default() + const res = await app.request("/event?after_seq=0") + expect(res.status).toBe(200) + const body = await res.text() + expect(body).toContain("server.stream.expired") + }, + }) + }) + + test("replays existing events when cursor is valid", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true + const session = await Session.create({ title: "event-replay" }) + await Session.setTitle({ sessionID: session.id, title: "event-replay-2" }) + + const app = Server.Default() + const res = await app.request("/event?after_seq=0") + expect(res.status).toBe(200) + const body = await res.text() + expect(body).toContain("session.updated") + }, + }) + }) + + test("returns stream.expired for malformed replay cursor", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true + const app = Server.Default() + const res = await app.request("/event?after_seq=bad") + expect(res.status).toBe(200) + const body = await res.text() + expect(body).toContain("server.stream.expired") + }, + }) + }) + + test("keeps replay contract with tiny queue size", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX = 1 + + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = false + const app = Server.Default() + const expired = await app.request("/event?after_seq=0") + expect(expired.status).toBe(200) + expect(await expired.text()).toContain("server.stream.expired") + + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true + const session = await Session.create({ title: "event-small-queue" }) + await Session.setTitle({ sessionID: session.id, title: "event-small-queue-2" }) + + const replay = await app.request("/event?after_seq=0") + expect(replay.status).toBe(200) + const body = await replay.text() + expect(body).toContain("session.updated") + }, + }) + }) + + test("emits stream.lagged on overflow", async () => { + await using tmp = await tmpdir({ git: true }) + const original = Bus.subscribeAll + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX = 1 + const app = Server.Default() + + Bus.subscribeAll = (callback: (event: any) => unknown) => { + for (let i = 0; i < 2000; i++) { + callback({ + type: "test.event.overflow", + properties: { index: i, value: "x".repeat(128) }, + }) + } + return () => {} + } + + const stream = await app.request("/event") + + expect(stream.status).toBe(200) + const body = await stream.text() + expect(body).toContain("server.stream.lagged") + }, + }) + Bus.subscribeAll = original + }) +}) From ba1f6839f431e21d7617c6bfaa6948755c431b01 Mon Sep 17 00:00:00 2001 From: Luke Parker <10430890+Hona@users.noreply.github.com> Date: Sun, 29 Mar 2026 13:38:53 +1000 Subject: [PATCH 2/2] test: add regression coverage for sync plugin hooks (#19589) --- packages/opencode/test/plugin/trigger.test.ts | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 packages/opencode/test/plugin/trigger.test.ts diff --git a/packages/opencode/test/plugin/trigger.test.ts b/packages/opencode/test/plugin/trigger.test.ts new file mode 100644 index 000000000000..d89d930c69ea --- /dev/null +++ b/packages/opencode/test/plugin/trigger.test.ts @@ -0,0 +1,111 @@ +import { afterAll, afterEach, describe, expect, test } from "bun:test" +import path from "path" +import { pathToFileURL } from "url" +import { tmpdir } from "../fixture/fixture" + +const disableDefault = process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS +process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = "1" + +const { Plugin } = await import("../../src/plugin/index") +const { Instance } = await import("../../src/project/instance") + +afterEach(async () => { + await Instance.disposeAll() +}) + +afterAll(() => { + if (disableDefault === undefined) { + delete process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS + return + } + process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = disableDefault +}) + +async function project(source: string) { + return tmpdir({ + init: async (dir) => { + const file = path.join(dir, "plugin.ts") + await Bun.write(file, source) + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify( + { + $schema: "https://opencode.ai/config.json", + plugin: [pathToFileURL(file).href], + }, + null, + 2, + ), + ) + }, + }) +} + +describe("plugin.trigger", () => { + test("runs synchronous hooks without crashing", async () => { + await using tmp = await project( + [ + "export default async () => ({", + ' "experimental.chat.system.transform": (_input, output) => {', + ' output.system.unshift("sync")', + " },", + "})", + "", + ].join("\n"), + ) + + const out = await Instance.provide({ + directory: tmp.path, + fn: async () => { + const out = { system: [] as string[] } + await Plugin.trigger( + "experimental.chat.system.transform", + { + model: { + providerID: "anthropic", + modelID: "claude-sonnet-4-6", + } as any, + }, + out, + ) + return out + }, + }) + + expect(out.system).toEqual(["sync"]) + }) + + test("awaits asynchronous hooks", async () => { + await using tmp = await project( + [ + "export default async () => ({", + ' "experimental.chat.system.transform": async (_input, output) => {', + " await Bun.sleep(1)", + ' output.system.unshift("async")', + " },", + "})", + "", + ].join("\n"), + ) + + const out = await Instance.provide({ + directory: tmp.path, + fn: async () => { + const out = { system: [] as string[] } + await Plugin.trigger( + "experimental.chat.system.transform", + { + model: { + providerID: "anthropic", + modelID: "claude-sonnet-4-6", + } as any, + }, + out, + ) + return out + }, + }) + + expect(out.system).toEqual(["async"]) + }) +})