Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/opencode/src/bus/bus-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@ export namespace BusEvent {

const registry = new Map<string, Definition>()

export const StreamLagged = define(
"server.stream.lagged",
z.object({
limit: z.number(),
queued: z.number(),
dropped: z.number(),
}),
)

export const StreamExpired = define(
"server.stream.expired",
z.object({
next: z.number(),
oldest: z.number(),
latest: z.number(),
}),
)

export function define<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
const result = {
type,
Expand Down
1 change: 1 addition & 0 deletions packages/opencode/src/flag/flag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export namespace Flag {
truthy("OPENCODE_ENABLE_EXA") || OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_EXA")
export const OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS = number("OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS")
export const OPENCODE_EXPERIMENTAL_OUTPUT_TOKEN_MAX = number("OPENCODE_EXPERIMENTAL_OUTPUT_TOKEN_MAX")
export const OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX = number("OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX")
export const OPENCODE_EXPERIMENTAL_OXFMT = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_OXFMT")
export const OPENCODE_EXPERIMENTAL_LSP_TY = truthy("OPENCODE_EXPERIMENTAL_LSP_TY")
export const OPENCODE_EXPERIMENTAL_LSP_TOOL = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_LSP_TOOL")
Expand Down
129 changes: 124 additions & 5 deletions packages/opencode/src/server/routes/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,50 @@ import { Log } from "@/util/log"
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { AsyncQueue } from "../../util/queue"
import { Database, and, asc, gt, lte, sql } from "@/storage/db"
import { EventTable } from "@/sync/event.sql"
import { EventID } from "@/sync/schema"
import { Flag } from "@/flag/flag"

const log = Log.create({ service: "server" })

function max() {
return Flag.OPENCODE_EXPERIMENTAL_EVENT_QUEUE_MAX ?? 1000
}

function parse(input?: string) {
if (!input) return
const value = Number(input)
if (!Number.isSafeInteger(value)) return
if (value < 0) return
return value
}

function replay(next: number) {
return Database.use((db) => {
const oldest = db.select({ seq: sql<number>`min(${EventTable.seq})` }).from(EventTable).get()?.seq ?? 0
const latest = db.select({ seq: sql<number>`max(${EventTable.seq})` }).from(EventTable).get()?.seq ?? 0
if (next < oldest) {
return {
rows: [] as Array<{ id: string; type: string; data: Record<string, unknown> }>,
oldest,
latest,
}
}
const rows = db
.select({
id: EventTable.id,
type: EventTable.type,
data: EventTable.data,
})
.from(EventTable)
.where(and(gt(EventTable.seq, next), lte(EventTable.seq, latest)))
.orderBy(asc(EventTable.seq))
.all()
return { rows, oldest, latest }
})
}

export const EventRoutes = () =>
new Hono().get(
"/event",
Expand All @@ -27,24 +68,96 @@ export const EventRoutes = () =>
},
}),
async (c) => {
const raw = c.req.query("after_seq")
const after = parse(raw ?? undefined)
log.info("event connected")
c.header("Cache-Control", "no-cache, no-transform")
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>()
if (raw !== undefined) {
await stream.writeSSE({
data: JSON.stringify({
type: "server.connected",
properties: {},
}),
})

if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES || after === undefined) {
await stream.writeSSE({
data: JSON.stringify({
type: BusEvent.StreamExpired.type,
properties: {
next: after ?? -1,
oldest: 0,
latest: 0,
},
}),
})
return
}

const data = replay(after)
if (after < data.oldest) {
await stream.writeSSE({
data: JSON.stringify({
type: BusEvent.StreamExpired.type,
properties: {
next: after,
oldest: data.oldest,
latest: data.latest,
},
}),
})
return
}

for (const row of data.rows) {
await stream.writeSSE({
data: JSON.stringify({
id: row.id,
type: row.type,
properties: row.data,
}),
})
}
return
}

const limit = max()
const q = new AsyncQueue<string | null>({ max: limit })
let done = false
let dropped = 0

function push(data: string, input?: { force?: boolean }) {
if (q.push(data, input)) return
dropped++
q.clear()
q.push(
JSON.stringify({
type: BusEvent.StreamLagged.type,
properties: {
limit,
queued: q.size(),
dropped,
},
}),
{ force: true },
)
q.push(null, { force: true })
}

q.push(
push(
JSON.stringify({
type: "server.connected",
properties: {},
}),
{ force: true },
)

// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
q.push(
push(
JSON.stringify({
type: "server.heartbeat",
properties: {},
Expand All @@ -57,12 +170,18 @@ export const EventRoutes = () =>
done = true
clearInterval(heartbeat)
unsub()
q.push(null)
q.push(null, { force: true })
log.info("event disconnected")
}

const unsub = Bus.subscribeAll((event) => {
q.push(JSON.stringify(event))
const id = EventID.ascending()
push(
JSON.stringify({
id,
...event,
}),
)
if (event.type === Bus.InstanceDisposed.type) {
stop()
}
Expand Down
26 changes: 23 additions & 3 deletions packages/opencode/src/util/queue.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,41 @@
export class AsyncQueue<T> implements AsyncIterable<T> {
private queue: T[] = []
private resolvers: ((value: T) => void)[] = []
private max: number

push(item: T) {
constructor(input?: { max?: number }) {
this.max = input?.max ?? Number.POSITIVE_INFINITY
}

push(item: T, input?: { force?: boolean }) {
const resolve = this.resolvers.shift()
if (resolve) resolve(item)
else this.queue.push(item)
if (resolve) {
resolve(item)
return true
}
if (!input?.force && this.queue.length >= this.max) {
return false
}
this.queue.push(item)
return true
}

async next(): Promise<T> {
if (this.queue.length > 0) return this.queue.shift()!
return new Promise((resolve) => this.resolvers.push(resolve))
}

clear() {
this.queue.length = 0
}

async *[Symbol.asyncIterator]() {
while (true) yield await this.next()
}

size() {
return this.queue.length
}
}

export async function work<T>(concurrency: number, items: T[], fn: (item: T) => Promise<void>) {
Expand Down
Loading
Loading