diff --git a/packages/opencode/src/bus/bus-event.ts b/packages/opencode/src/bus/bus-event.ts index d97922290eb2..53b4a8f6935c 100644 --- a/packages/opencode/src/bus/bus-event.ts +++ b/packages/opencode/src/bus/bus-event.ts @@ -6,6 +6,24 @@ export namespace BusEvent { const registry = new Map() + export const StreamLagged = define( + "server.stream.lagged", + z.object({ + limit: z.number(), + queued: z.number(), + dropped: z.number(), + }), + ) + + export const StreamExpired = define( + "server.stream.expired", + z.object({ + next: z.number(), + oldest: z.number(), + latest: z.number(), + }), + ) + export function define(type: Type, properties: Properties) { const result = { type, diff --git a/packages/opencode/src/flag/flag.ts b/packages/opencode/src/flag/flag.ts index 27190f2eb24e..3d28519fb07b 100644 --- a/packages/opencode/src/flag/flag.ts +++ b/packages/opencode/src/flag/flag.ts @@ -62,6 +62,7 @@ export namespace Flag { truthy("OPENCODE_ENABLE_EXA") || OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_EXA") export const OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS = number("OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS") export const OPENCODE_EXPERIMENTAL_OUTPUT_TOKEN_MAX = number("OPENCODE_EXPERIMENTAL_OUTPUT_TOKEN_MAX") + export const OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX = number("OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX") export const OPENCODE_EXPERIMENTAL_OXFMT = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_OXFMT") export const OPENCODE_EXPERIMENTAL_LSP_TY = truthy("OPENCODE_EXPERIMENTAL_LSP_TY") export const OPENCODE_EXPERIMENTAL_LSP_TOOL = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_LSP_TOOL") diff --git a/packages/opencode/src/server/routes/event.ts b/packages/opencode/src/server/routes/event.ts index 989b857710d2..900f8fa81dbb 100644 --- a/packages/opencode/src/server/routes/event.ts +++ b/packages/opencode/src/server/routes/event.ts @@ -5,9 +5,50 @@ import { Log } from "@/util/log" import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" import { AsyncQueue } from "../../util/queue" +import { Database, and, asc, gt, lte, sql } from "@/storage/db" +import { EventTable } from "@/sync/event.sql" +import { EventID } from "@/sync/schema" +import { Flag } from "@/flag/flag" const log = Log.create({ service: "server" }) +function max() { + return Flag.OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX ?? 1000 +} + +function parse(input?: string) { + if (!input) return + const value = Number(input) + if (!Number.isSafeInteger(value)) return + if (value < 0) return + return value +} + +function replay(next: number) { + return Database.use((db) => { + const oldest = db.select({ seq: sql`min(${EventTable.seq})` }).from(EventTable).get()?.seq ?? 0 + const latest = db.select({ seq: sql`max(${EventTable.seq})` }).from(EventTable).get()?.seq ?? 0 + if (next < oldest) { + return { + rows: [] as Array<{ id: string; type: string; data: Record }>, + oldest, + latest, + } + } + const rows = db + .select({ + id: EventTable.id, + type: EventTable.type, + data: EventTable.data, + }) + .from(EventTable) + .where(and(gt(EventTable.seq, next), lte(EventTable.seq, latest))) + .orderBy(asc(EventTable.seq)) + .all() + return { rows, oldest, latest } + }) +} + export const EventRoutes = () => new Hono().get( "/event", @@ -27,24 +68,96 @@ export const EventRoutes = () => }, }), async (c) => { + const raw = c.req.query("after_seq") + const after = parse(raw ?? undefined) log.info("event connected") c.header("Cache-Control", "no-cache, no-transform") c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - const q = new AsyncQueue() + if (raw !== undefined) { + await stream.writeSSE({ + data: JSON.stringify({ + type: "server.connected", + properties: {}, + }), + }) + + if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES || after === undefined) { + await stream.writeSSE({ + data: JSON.stringify({ + type: BusEvent.StreamExpired.type, + properties: { + next: after ?? -1, + oldest: 0, + latest: 0, + }, + }), + }) + return + } + + const data = replay(after) + if (after < data.oldest) { + await stream.writeSSE({ + data: JSON.stringify({ + type: BusEvent.StreamExpired.type, + properties: { + next: after, + oldest: data.oldest, + latest: data.latest, + }, + }), + }) + return + } + + for (const row of data.rows) { + await stream.writeSSE({ + data: JSON.stringify({ + id: row.id, + type: row.type, + properties: row.data, + }), + }) + } + return + } + + const limit = max() + const q = new AsyncQueue({ max: limit }) let done = false + let dropped = 0 + + function push(data: string, input?: { force?: boolean }) { + if (q.push(data, input)) return + dropped++ + q.clear() + q.push( + JSON.stringify({ + type: BusEvent.StreamLagged.type, + properties: { + limit, + queued: q.size(), + dropped, + }, + }), + { force: true }, + ) + q.push(null, { force: true }) + } - q.push( + push( JSON.stringify({ type: "server.connected", properties: {}, }), + { force: true }, ) // Send heartbeat every 10s to prevent stalled proxy streams. const heartbeat = setInterval(() => { - q.push( + push( JSON.stringify({ type: "server.heartbeat", properties: {}, @@ -57,12 +170,18 @@ export const EventRoutes = () => done = true clearInterval(heartbeat) unsub() - q.push(null) + q.push(null, { force: true }) log.info("event disconnected") } const unsub = Bus.subscribeAll((event) => { - q.push(JSON.stringify(event)) + const id = EventID.ascending() + push( + JSON.stringify({ + id, + ...event, + }), + ) if (event.type === Bus.InstanceDisposed.type) { stop() } diff --git a/packages/opencode/src/util/queue.ts b/packages/opencode/src/util/queue.ts index a1af53fe8f09..6899485f638f 100644 --- a/packages/opencode/src/util/queue.ts +++ b/packages/opencode/src/util/queue.ts @@ -1,11 +1,23 @@ export class AsyncQueue implements AsyncIterable { private queue: T[] = [] private resolvers: ((value: T) => void)[] = [] + private max: number - push(item: T) { + constructor(input?: { max?: number }) { + this.max = input?.max ?? Number.POSITIVE_INFINITY + } + + push(item: T, input?: { force?: boolean }) { const resolve = this.resolvers.shift() - if (resolve) resolve(item) - else this.queue.push(item) + if (resolve) { + resolve(item) + return true + } + if (!input?.force && this.queue.length >= this.max) { + return false + } + this.queue.push(item) + return true } async next(): Promise { @@ -13,9 +25,17 @@ export class AsyncQueue implements AsyncIterable { return new Promise((resolve) => this.resolvers.push(resolve)) } + clear() { + this.queue.length = 0 + } + async *[Symbol.asyncIterator]() { while (true) yield await this.next() } + + size() { + return this.queue.length + } } export async function work(concurrency: number, items: T[], fn: (item: T) => Promise) { diff --git a/packages/opencode/test/server/event-subscribe.test.ts b/packages/opencode/test/server/event-subscribe.test.ts new file mode 100644 index 000000000000..7d2a97ce4697 --- /dev/null +++ b/packages/opencode/test/server/event-subscribe.test.ts @@ -0,0 +1,157 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test" +import { tmpdir } from "../fixture/fixture" +import { resetDatabase } from "../fixture/db" +import { Instance } from "../../src/project/instance" +import { Server } from "../../src/server/server" +import { Session } from "../../src/session" +import { Flag } from "../../src/flag/flag" +import { Database, eq } from "../../src/storage/db" +import { EventTable } from "../../src/sync/event.sql" +import { Bus } from "../../src/bus" + +const workspaces = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES +const queue = Flag.OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX + +beforeEach(async () => { + await resetDatabase() +}) + +afterEach(() => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = workspaces + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX = queue +}) + +describe("server /event", () => { + test("returns stream.expired when replay is requested and workspaces are disabled", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = false + await Session.create({ title: "event-expired" }) + + const app = Server.Default() + const res = await app.request("/event?after_seq=0") + expect(res.status).toBe(200) + const body = await res.text() + expect(body).toContain("server.stream.expired") + }, + }) + }) + + test("returns stream.expired when replay cursor is older than oldest event", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true + const session = await Session.create({ title: "event-old-cursor" }) + await Session.setTitle({ sessionID: session.id, title: "event-old-cursor-2" }) + Database.use((db) => db.delete(EventTable).where(eq(EventTable.seq, 0)).run()) + + const app = Server.Default() + const res = await app.request("/event?after_seq=0") + expect(res.status).toBe(200) + const body = await res.text() + expect(body).toContain("server.stream.expired") + }, + }) + }) + + test("replays existing events when cursor is valid", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true + const session = await Session.create({ title: "event-replay" }) + await Session.setTitle({ sessionID: session.id, title: "event-replay-2" }) + + const app = Server.Default() + const res = await app.request("/event?after_seq=0") + expect(res.status).toBe(200) + const body = await res.text() + expect(body).toContain("session.updated") + }, + }) + }) + + test("returns stream.expired for malformed replay cursor", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true + const app = Server.Default() + const res = await app.request("/event?after_seq=bad") + expect(res.status).toBe(200) + const body = await res.text() + expect(body).toContain("server.stream.expired") + }, + }) + }) + + test("keeps replay contract with tiny queue size", async () => { + await using tmp = await tmpdir({ git: true }) + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX = 1 + + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = false + const app = Server.Default() + const expired = await app.request("/event?after_seq=0") + expect(expired.status).toBe(200) + expect(await expired.text()).toContain("server.stream.expired") + + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true + const session = await Session.create({ title: "event-small-queue" }) + await Session.setTitle({ sessionID: session.id, title: "event-small-queue-2" }) + + const replay = await app.request("/event?after_seq=0") + expect(replay.status).toBe(200) + const body = await replay.text() + expect(body).toContain("session.updated") + }, + }) + }) + + test("emits stream.lagged on overflow", async () => { + await using tmp = await tmpdir({ git: true }) + const original = Bus.subscribeAll + await Instance.provide({ + directory: tmp.path, + fn: async () => { + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX = 1 + const app = Server.Default() + + Bus.subscribeAll = (callback: (event: any) => unknown) => { + for (let i = 0; i < 2000; i++) { + callback({ + type: "test.event.overflow", + properties: { index: i, value: "x".repeat(128) }, + }) + } + return () => {} + } + + const stream = await app.request("/event") + + expect(stream.status).toBe(200) + const body = await stream.text() + expect(body).toContain("server.stream.lagged") + }, + }) + Bus.subscribeAll = original + }) +})