Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion src/cortex-exec/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ const DEFAULT_TIMEOUT_SECS: u64 = 600;
/// Default timeout for a single LLM request (2 minutes).
const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 120;

/// Per-chunk timeout during streaming responses.
/// Prevents indefinite hangs when connections stall mid-stream.
/// See cortex_common::http_client for timeout hierarchy documentation.
const STREAMING_CHUNK_TIMEOUT_SECS: u64 = 30;

/// Maximum retries for transient errors.
const MAX_RETRIES: usize = 3;

Expand Down Expand Up @@ -555,7 +560,28 @@ impl ExecRunner {
let mut partial_tool_calls: std::collections::HashMap<String, (String, String)> =
std::collections::HashMap::new();

while let Some(event) = stream.next().await {
loop {
// Apply per-chunk timeout to prevent indefinite hangs when connections stall
let event = match tokio::time::timeout(
Duration::from_secs(STREAMING_CHUNK_TIMEOUT_SECS),
stream.next(),
)
.await
{
Ok(Some(event)) => event,
Ok(None) => break, // Stream ended normally
Err(_) => {
tracing::warn!(
"Stream chunk timeout after {}s",
STREAMING_CHUNK_TIMEOUT_SECS
);
return Err(CortexError::Provider(format!(
"Streaming timeout: no response chunk received within {}s",
STREAMING_CHUNK_TIMEOUT_SECS
)));
}
};

match event? {
ResponseEvent::Delta(delta) => {
if self.options.streaming {
Expand Down
Loading