diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts index 7969e3079574..c2a22e469eb4 100644 --- a/packages/opencode/src/config/config.ts +++ b/packages/opencode/src/config/config.ts @@ -1095,6 +1095,12 @@ export namespace Config { .positive() .optional() .describe("Timeout in milliseconds for model context protocol (MCP) requests"), + stream_idle_timeout: z + .number() + .int() + .positive() + .optional() + .describe("Timeout in milliseconds between stream chunks from LLM. If no data is received within this period, the request will be retried. Default is 60000 (60 seconds). Set to 0 to disable."), }) .optional(), }) diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 6358c6c5e9b0..d35b3972bf91 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -13,6 +13,17 @@ import { iife } from "@/util/iife" import { type SystemError } from "bun" import type { Provider } from "@/provider/provider" +/** + * Error thrown when no data is received from the LLM stream within the timeout period. + * This typically indicates a stalled connection (network issues, LLM provider unresponsive). + */ +export class StreamIdleTimeoutError extends Error { + constructor(public readonly timeoutMs: number) { + super(`Stream idle timeout: no data received for ${timeoutMs}ms`) + this.name = "StreamIdleTimeoutError" + } +} + export namespace MessageV2 { export const OutputLengthError = NamedError.create("MessageOutputLengthError", z.object({})) export const AbortedError = NamedError.create("MessageAbortedError", z.object({ message: z.string() })) @@ -697,6 +708,33 @@ export namespace MessageV2 { }, { cause: e }, ).toObject() + case e instanceof StreamIdleTimeoutError: + return new MessageV2.APIError( + { + message: e.message, + isRetryable: true, + metadata: { + timeoutMs: String(e.timeoutMs), + }, + }, + { cause: e }, + ).toObject() + // Handle additional network errors that indicate transient connection issues + case ["ETIMEDOUT", "ENOTFOUND", "ECONNREFUSED", "EPIPE", "EHOSTUNREACH", "ENETUNREACH"].includes( + (e as SystemError)?.code ?? "" + ): + return new MessageV2.APIError( + { + message: `Network error: ${(e as SystemError).code}`, + isRetryable: true, + metadata: { + code: (e as SystemError).code ?? "", + syscall: (e as SystemError).syscall ?? "", + message: (e as SystemError).message ?? "", + }, + }, + { cause: e }, + ).toObject() case APICallError.isInstance(e): const message = iife(() => { let msg = e.message diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 27071056180a..2f099979dce8 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -1,4 +1,4 @@ -import { MessageV2 } from "./message-v2" +import { MessageV2, StreamIdleTimeoutError } from "./message-v2" import { Log } from "@/util/log" import { Identifier } from "@/id/id" import { Session } from "." @@ -16,6 +16,62 @@ import { SessionCompaction } from "./compaction" import { PermissionNext } from "@/permission/next" import { Question } from "@/question" +/** + * Wraps an async iterable with an idle timeout. If no value is yielded within + * the timeout period, throws a StreamIdleTimeoutError. + * + * This prevents the streaming loop from hanging indefinitely when: + * - Network connection drops mid-stream (TCP half-open) + * - LLM provider stalls without closing the connection + * - Proxy/gateway timeouts that don't properly terminate the stream + */ +async function* withIdleTimeout( + stream: AsyncIterable, + timeoutMs: number, + abort: AbortSignal +): AsyncGenerator { + const iterator = stream[Symbol.asyncIterator]() + + while (true) { + abort.throwIfAborted() + + let timer: ReturnType | undefined + let rejectTimeout: ((error: Error) => void) | undefined + + const timeoutPromise = new Promise((_, reject) => { + rejectTimeout = reject + timer = setTimeout(() => { + reject(new StreamIdleTimeoutError(timeoutMs)) + }, timeoutMs) + }) + + // Clean up timer when abort signal fires + const abortHandler = () => { + if (timer) clearTimeout(timer) + } + abort.addEventListener("abort", abortHandler, { once: true }) + + try { + const result = await Promise.race([ + iterator.next(), + timeoutPromise + ]) + + // Clear the timer since we got a result + if (timer) clearTimeout(timer) + abort.removeEventListener("abort", abortHandler) + + if (result.done) return + yield result.value + } catch (e) { + // Clean up on error too + if (timer) clearTimeout(timer) + abort.removeEventListener("abort", abortHandler) + throw e + } + } +} + export namespace SessionProcessor { const DOOM_LOOP_THRESHOLD = 3 const log = Log.create({ service: "session.processor" }) @@ -45,14 +101,22 @@ export namespace SessionProcessor { async process(streamInput: LLM.StreamInput) { log.info("process") needsCompaction = false - const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true + const config = await Config.get() + const shouldBreak = config.experimental?.continue_loop_on_deny !== true + // Default to 60 seconds between chunks, 0 disables the timeout + const streamIdleTimeout = config.experimental?.stream_idle_timeout ?? 60000 while (true) { try { let currentText: MessageV2.TextPart | undefined let reasoningMap: Record = {} const stream = await LLM.stream(streamInput) - for await (const value of stream.fullStream) { + // Wrap the stream with idle timeout to prevent hanging on stalled connections + const wrappedStream = streamIdleTimeout > 0 + ? withIdleTimeout(stream.fullStream, streamIdleTimeout, input.abort) + : stream.fullStream + + for await (const value of wrappedStream) { input.abort.throwIfAborted() switch (value.type) { case "start": diff --git a/packages/opencode/test/session/stream-idle-timeout.test.ts b/packages/opencode/test/session/stream-idle-timeout.test.ts new file mode 100644 index 000000000000..a0d93c57ccce --- /dev/null +++ b/packages/opencode/test/session/stream-idle-timeout.test.ts @@ -0,0 +1,20 @@ +import { describe, test, expect } from "bun:test" +import { StreamIdleTimeoutError } from "../../src/session/message-v2" + +// We can't import withIdleTimeout directly since it's not exported, +// so we test the error handling integration + +describe("StreamIdleTimeoutError", () => { + test("has correct name and message", () => { + const error = new StreamIdleTimeoutError(60000) + expect(error.name).toBe("StreamIdleTimeoutError") + expect(error.message).toBe("Stream idle timeout: no data received for 60000ms") + expect(error.timeoutMs).toBe(60000) + }) + + test("is instanceof Error", () => { + const error = new StreamIdleTimeoutError(60000) + expect(error instanceof Error).toBe(true) + expect(error instanceof StreamIdleTimeoutError).toBe(true) + }) +})