Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/opencode/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,12 @@ export namespace Config {
.positive()
.optional()
.describe("Timeout in milliseconds for model context protocol (MCP) requests"),
stream_idle_timeout: z
.number()
.int()
.positive()
.optional()
.describe("Timeout in milliseconds between stream chunks from LLM. If no data is received within this period, the request will be retried. Default is 60000 (60 seconds). Set to 0 to disable."),
})
.optional(),
})
Expand Down
38 changes: 38 additions & 0 deletions packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ import { iife } from "@/util/iife"
import { type SystemError } from "bun"
import type { Provider } from "@/provider/provider"

/**
* Error thrown when no data is received from the LLM stream within the timeout period.
* This typically indicates a stalled connection (network issues, LLM provider unresponsive).
*/
export class StreamIdleTimeoutError extends Error {
constructor(public readonly timeoutMs: number) {
super(`Stream idle timeout: no data received for ${timeoutMs}ms`)
this.name = "StreamIdleTimeoutError"
}
}

export namespace MessageV2 {
export const OutputLengthError = NamedError.create("MessageOutputLengthError", z.object({}))
export const AbortedError = NamedError.create("MessageAbortedError", z.object({ message: z.string() }))
Expand Down Expand Up @@ -697,6 +708,33 @@ export namespace MessageV2 {
},
{ cause: e },
).toObject()
case e instanceof StreamIdleTimeoutError:
return new MessageV2.APIError(
{
message: e.message,
isRetryable: true,
metadata: {
timeoutMs: String(e.timeoutMs),
},
},
{ cause: e },
).toObject()
// Handle additional network errors that indicate transient connection issues
case ["ETIMEDOUT", "ENOTFOUND", "ECONNREFUSED", "EPIPE", "EHOSTUNREACH", "ENETUNREACH"].includes(
(e as SystemError)?.code ?? ""
):
return new MessageV2.APIError(
{
message: `Network error: ${(e as SystemError).code}`,
isRetryable: true,
metadata: {
code: (e as SystemError).code ?? "",
syscall: (e as SystemError).syscall ?? "",
message: (e as SystemError).message ?? "",
},
},
{ cause: e },
).toObject()
case APICallError.isInstance(e):
const message = iife(() => {
let msg = e.message
Expand Down
70 changes: 67 additions & 3 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MessageV2 } from "./message-v2"
import { MessageV2, StreamIdleTimeoutError } from "./message-v2"
import { Log } from "@/util/log"
import { Identifier } from "@/id/id"
import { Session } from "."
Expand All @@ -16,6 +16,62 @@ import { SessionCompaction } from "./compaction"
import { PermissionNext } from "@/permission/next"
import { Question } from "@/question"

/**
* Wraps an async iterable with an idle timeout. If no value is yielded within
* the timeout period, throws a StreamIdleTimeoutError.
*
* This prevents the streaming loop from hanging indefinitely when:
* - Network connection drops mid-stream (TCP half-open)
* - LLM provider stalls without closing the connection
* - Proxy/gateway timeouts that don't properly terminate the stream
*/
async function* withIdleTimeout<T>(
stream: AsyncIterable<T>,
timeoutMs: number,
abort: AbortSignal
): AsyncGenerator<T> {
const iterator = stream[Symbol.asyncIterator]()

while (true) {
abort.throwIfAborted()

let timer: ReturnType<typeof setTimeout> | undefined
let rejectTimeout: ((error: Error) => void) | undefined

const timeoutPromise = new Promise<never>((_, reject) => {
rejectTimeout = reject
timer = setTimeout(() => {
reject(new StreamIdleTimeoutError(timeoutMs))
}, timeoutMs)
})

// Clean up timer when abort signal fires
const abortHandler = () => {
if (timer) clearTimeout(timer)
}
abort.addEventListener("abort", abortHandler, { once: true })

try {
const result = await Promise.race([
iterator.next(),
timeoutPromise
])

// Clear the timer since we got a result
if (timer) clearTimeout(timer)
abort.removeEventListener("abort", abortHandler)

if (result.done) return
yield result.value
} catch (e) {
// Clean up on error too
if (timer) clearTimeout(timer)
abort.removeEventListener("abort", abortHandler)
throw e
}
}
}

export namespace SessionProcessor {
const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })
Expand Down Expand Up @@ -45,14 +101,22 @@ export namespace SessionProcessor {
async process(streamInput: LLM.StreamInput) {
log.info("process")
needsCompaction = false
const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true
const config = await Config.get()
const shouldBreak = config.experimental?.continue_loop_on_deny !== true
// Default to 60 seconds between chunks, 0 disables the timeout
const streamIdleTimeout = config.experimental?.stream_idle_timeout ?? 60000
while (true) {
try {
let currentText: MessageV2.TextPart | undefined
let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
const stream = await LLM.stream(streamInput)

for await (const value of stream.fullStream) {
// Wrap the stream with idle timeout to prevent hanging on stalled connections
const wrappedStream = streamIdleTimeout > 0
? withIdleTimeout(stream.fullStream, streamIdleTimeout, input.abort)
: stream.fullStream

for await (const value of wrappedStream) {
input.abort.throwIfAborted()
switch (value.type) {
case "start":
Expand Down
20 changes: 20 additions & 0 deletions packages/opencode/test/session/stream-idle-timeout.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { describe, test, expect } from "bun:test"
import { StreamIdleTimeoutError } from "../../src/session/message-v2"

// We can't import withIdleTimeout directly since it's not exported,
// so we test the error handling integration

describe("StreamIdleTimeoutError", () => {
test("has correct name and message", () => {
const error = new StreamIdleTimeoutError(60000)
expect(error.name).toBe("StreamIdleTimeoutError")
expect(error.message).toBe("Stream idle timeout: no data received for 60000ms")
expect(error.timeoutMs).toBe(60000)
})

test("is instanceof Error", () => {
const error = new StreamIdleTimeoutError(60000)
expect(error instanceof Error).toBe(true)
expect(error instanceof StreamIdleTimeoutError).toBe(true)
})
})
Loading