From 27093ec7bdf6fe9f39874862eda37deacf411ea1 Mon Sep 17 00:00:00 2001 From: echobt Date: Wed, 4 Feb 2026 15:47:36 +0000 Subject: [PATCH] fix(exec): add per-chunk streaming timeout to prevent indefinite hangs --- src/cortex-exec/src/runner.rs | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/src/cortex-exec/src/runner.rs b/src/cortex-exec/src/runner.rs index e831324..eb44688 100644 --- a/src/cortex-exec/src/runner.rs +++ b/src/cortex-exec/src/runner.rs @@ -32,6 +32,11 @@ const DEFAULT_TIMEOUT_SECS: u64 = 600; /// Default timeout for a single LLM request (2 minutes). const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 120; +/// Per-chunk timeout during streaming responses. +/// Prevents indefinite hangs when connections stall mid-stream. +/// See cortex_common::http_client for timeout hierarchy documentation. +const STREAMING_CHUNK_TIMEOUT_SECS: u64 = 30; + /// Maximum retries for transient errors. const MAX_RETRIES: usize = 3; @@ -555,7 +560,28 @@ impl ExecRunner { let mut partial_tool_calls: std::collections::HashMap = std::collections::HashMap::new(); - while let Some(event) = stream.next().await { + loop { + // Apply per-chunk timeout to prevent indefinite hangs when connections stall + let event = match tokio::time::timeout( + Duration::from_secs(STREAMING_CHUNK_TIMEOUT_SECS), + stream.next(), + ) + .await + { + Ok(Some(event)) => event, + Ok(None) => break, // Stream ended normally + Err(_) => { + tracing::warn!( + "Stream chunk timeout after {}s", + STREAMING_CHUNK_TIMEOUT_SECS + ); + return Err(CortexError::Provider(format!( + "Streaming timeout: no response chunk received within {}s", + STREAMING_CHUNK_TIMEOUT_SECS + ))); + } + }; + match event? { ResponseEvent::Delta(delta) => { if self.options.streaming {