Skip to content

Commit 0540751

Browse files
authored
fix(core): use a queue to process events in event routes (#18259)
1 parent baa2041 commit 0540751

File tree

4 files changed

+123
-83
lines changed

4 files changed

+123
-83
lines changed

packages/opencode/src/bus/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ export namespace Bus {
5151
})
5252
const pending = []
5353
for (const key of [def.type, "*"]) {
54-
const match = state().subscriptions.get(key)
55-
for (const sub of match ?? []) {
54+
const match = [...(state().subscriptions.get(key) ?? [])]
55+
for (const sub of match) {
5656
pending.push(sub(payload))
5757
}
5858
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import { Hono } from "hono"
2+
import { describeRoute, resolver } from "hono-openapi"
3+
import { streamSSE } from "hono/streaming"
4+
import { Log } from "@/util/log"
5+
import { BusEvent } from "@/bus/bus-event"
6+
import { Bus } from "@/bus"
7+
import { lazy } from "../../util/lazy"
8+
import { AsyncQueue } from "../../util/queue"
9+
import { Instance } from "@/project/instance"
10+
11+
const log = Log.create({ service: "server" })
12+
13+
export const EventRoutes = lazy(() =>
14+
new Hono().get(
15+
"/event",
16+
describeRoute({
17+
summary: "Subscribe to events",
18+
description: "Get events",
19+
operationId: "event.subscribe",
20+
responses: {
21+
200: {
22+
description: "Event stream",
23+
content: {
24+
"text/event-stream": {
25+
schema: resolver(BusEvent.payloads()),
26+
},
27+
},
28+
},
29+
},
30+
}),
31+
async (c) => {
32+
log.info("event connected")
33+
c.header("X-Accel-Buffering", "no")
34+
c.header("X-Content-Type-Options", "nosniff")
35+
return streamSSE(c, async (stream) => {
36+
const q = new AsyncQueue<string | null>()
37+
let done = false
38+
39+
q.push(
40+
JSON.stringify({
41+
type: "server.connected",
42+
properties: {},
43+
}),
44+
)
45+
46+
// Send heartbeat every 10s to prevent stalled proxy streams.
47+
const heartbeat = setInterval(() => {
48+
q.push(
49+
JSON.stringify({
50+
type: "server.heartbeat",
51+
properties: {},
52+
}),
53+
)
54+
}, 10_000)
55+
56+
const unsub = Bus.subscribeAll((event) => {
57+
q.push(JSON.stringify(event))
58+
if (event.type === Bus.InstanceDisposed.type) {
59+
stop()
60+
}
61+
})
62+
63+
const stop = () => {
64+
if (done) return
65+
done = true
66+
clearInterval(heartbeat)
67+
unsub()
68+
q.push(null)
69+
log.info("event disconnected")
70+
}
71+
72+
stream.onAbort(stop)
73+
74+
try {
75+
for await (const data of q) {
76+
if (data === null) return
77+
await stream.writeSSE({ data })
78+
}
79+
} finally {
80+
stop()
81+
}
82+
})
83+
},
84+
),
85+
)

packages/opencode/src/server/routes/global.ts

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { streamSSE } from "hono/streaming"
44
import z from "zod"
55
import { BusEvent } from "@/bus/bus-event"
66
import { GlobalBus } from "@/bus/global"
7+
import { AsyncQueue } from "@/util/queue"
78
import { Instance } from "../../project/instance"
89
import { Installation } from "@/installation"
910
import { Log } from "../../util/log"
@@ -69,41 +70,54 @@ export const GlobalRoutes = lazy(() =>
6970
c.header("X-Accel-Buffering", "no")
7071
c.header("X-Content-Type-Options", "nosniff")
7172
return streamSSE(c, async (stream) => {
72-
stream.writeSSE({
73-
data: JSON.stringify({
73+
const q = new AsyncQueue<string | null>()
74+
let done = false
75+
76+
q.push(
77+
JSON.stringify({
7478
payload: {
7579
type: "server.connected",
7680
properties: {},
7781
},
7882
}),
79-
})
80-
async function handler(event: any) {
81-
await stream.writeSSE({
82-
data: JSON.stringify(event),
83-
})
84-
}
85-
GlobalBus.on("event", handler)
83+
)
8684

8785
// Send heartbeat every 10s to prevent stalled proxy streams.
8886
const heartbeat = setInterval(() => {
89-
stream.writeSSE({
90-
data: JSON.stringify({
87+
q.push(
88+
JSON.stringify({
9189
payload: {
9290
type: "server.heartbeat",
9391
properties: {},
9492
},
9593
}),
96-
})
94+
)
9795
}, 10_000)
9896

99-
await new Promise<void>((resolve) => {
100-
stream.onAbort(() => {
101-
clearInterval(heartbeat)
102-
GlobalBus.off("event", handler)
103-
resolve()
104-
log.info("global event disconnected")
105-
})
106-
})
97+
async function handler(event: any) {
98+
q.push(JSON.stringify(event))
99+
}
100+
GlobalBus.on("event", handler)
101+
102+
const stop = () => {
103+
if (done) return
104+
done = true
105+
clearInterval(heartbeat)
106+
GlobalBus.off("event", handler)
107+
q.push(null)
108+
log.info("event disconnected")
109+
}
110+
111+
stream.onAbort(stop)
112+
113+
try {
114+
for await (const data of q) {
115+
if (data === null) return
116+
await stream.writeSSE({ data })
117+
}
118+
} finally {
119+
stop()
120+
}
107121
})
108122
},
109123
)

packages/opencode/src/server/server.ts

Lines changed: 2 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
1-
import { BusEvent } from "@/bus/bus-event"
2-
import { Bus } from "@/bus"
31
import { Log } from "../util/log"
42
import { describeRoute, generateSpecs, validator, resolver, openAPIRouteHandler } from "hono-openapi"
53
import { Hono } from "hono"
64
import { cors } from "hono/cors"
7-
import { streamSSE } from "hono/streaming"
85
import { proxy } from "hono/proxy"
96
import { basicAuth } from "hono/basic-auth"
107
import z from "zod"
@@ -34,6 +31,7 @@ import { FileRoutes } from "./routes/file"
3431
import { ConfigRoutes } from "./routes/config"
3532
import { ExperimentalRoutes } from "./routes/experimental"
3633
import { ProviderRoutes } from "./routes/provider"
34+
import { EventRoutes } from "./routes/event"
3735
import { InstanceBootstrap } from "../project/bootstrap"
3836
import { NotFoundError } from "../storage/db"
3937
import type { ContentfulStatusCode } from "hono/utils/http-status"
@@ -251,6 +249,7 @@ export namespace Server {
251249
.route("/question", QuestionRoutes())
252250
.route("/provider", ProviderRoutes())
253251
.route("/", FileRoutes())
252+
.route("/", EventRoutes())
254253
.route("/mcp", McpRoutes())
255254
.route("/tui", TuiRoutes())
256255
.post(
@@ -498,64 +497,6 @@ export namespace Server {
498497
return c.json(await Format.status())
499498
},
500499
)
501-
.get(
502-
"/event",
503-
describeRoute({
504-
summary: "Subscribe to events",
505-
description: "Get events",
506-
operationId: "event.subscribe",
507-
responses: {
508-
200: {
509-
description: "Event stream",
510-
content: {
511-
"text/event-stream": {
512-
schema: resolver(BusEvent.payloads()),
513-
},
514-
},
515-
},
516-
},
517-
}),
518-
async (c) => {
519-
log.info("event connected")
520-
c.header("X-Accel-Buffering", "no")
521-
c.header("X-Content-Type-Options", "nosniff")
522-
return streamSSE(c, async (stream) => {
523-
stream.writeSSE({
524-
data: JSON.stringify({
525-
type: "server.connected",
526-
properties: {},
527-
}),
528-
})
529-
const unsub = Bus.subscribeAll(async (event) => {
530-
await stream.writeSSE({
531-
data: JSON.stringify(event),
532-
})
533-
if (event.type === Bus.InstanceDisposed.type) {
534-
stream.close()
535-
}
536-
})
537-
538-
// Send heartbeat every 10s to prevent stalled proxy streams.
539-
const heartbeat = setInterval(() => {
540-
stream.writeSSE({
541-
data: JSON.stringify({
542-
type: "server.heartbeat",
543-
properties: {},
544-
}),
545-
})
546-
}, 10_000)
547-
548-
await new Promise<void>((resolve) => {
549-
stream.onAbort(() => {
550-
clearInterval(heartbeat)
551-
unsub()
552-
resolve()
553-
log.info("event disconnected")
554-
})
555-
})
556-
})
557-
},
558-
)
559500
.all("/*", async (c) => {
560501
const path = c.req.path
561502

0 commit comments

Comments
 (0)