From c1e69dd10062d3f16cb050527526e4ceef03f8dd Mon Sep 17 00:00:00 2001 From: echobt Date: Wed, 4 Feb 2026 15:47:25 +0000 Subject: [PATCH 1/3] refactor: consolidated code quality improvements - constants, lazy init, and documentation This PR consolidates the following refactoring changes: - #27: Centralize timeout constants - #43: Replace magic numbers with documented named constants - #67: Use LazyLock for static regex initialization - #68: Extract subagent timeout values as named constants Key changes: - Created centralized timeout module with documented constants - Replaced scattered magic numbers with well-documented constants - Improved static initialization using LazyLock for regexes - Added comprehensive documentation for timeout hierarchy --- Cargo.lock | 1 + src/cortex-app-server/src/auth.rs | 6 +- src/cortex-app-server/src/config.rs | 10 ++ src/cortex-app-server/src/storage.rs | 3 - src/cortex-app-server/src/streaming.rs | 4 +- src/cortex-apply-patch/src/hunk.rs | 11 -- src/cortex-common/src/config_substitution.rs | 50 +++++---- src/cortex-common/src/http_client.rs | 48 ++++++++ src/cortex-common/src/lib.rs | 6 + src/cortex-common/src/timeout.rs | 65 +++++++++++ src/cortex-engine/src/tools/handlers/batch.rs | 40 +++++-- src/cortex-engine/src/tools/handlers/grep.rs | 1 + src/cortex-engine/src/tools/handlers/mod.rs | 5 +- .../src/tools/unified_executor.rs | 1 + src/cortex-exec/Cargo.toml | 1 + src/cortex-exec/src/runner.rs | 8 ++ src/cortex-mcp-client/src/transport.rs | 103 +----------------- .../src/runner/event_loop/streaming.rs | 16 ++- .../src/runner/event_loop/subagent.rs | 12 +- 19 files changed, 228 insertions(+), 163 deletions(-) create mode 100644 src/cortex-common/src/timeout.rs diff --git a/Cargo.lock b/Cargo.lock index de5594c..f2363d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1204,6 +1204,7 @@ dependencies = [ name = "cortex-exec" version = "0.0.6" dependencies = [ + "cortex-common", "cortex-engine", "cortex-protocol", "serde_json", diff --git a/src/cortex-app-server/src/auth.rs b/src/cortex-app-server/src/auth.rs index 414f36f..4f240c3 100644 --- a/src/cortex-app-server/src/auth.rs +++ b/src/cortex-app-server/src/auth.rs @@ -45,7 +45,7 @@ impl Claims { pub fn new(user_id: impl Into, expiry_seconds: u64) -> Self { let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_secs(); Self { @@ -75,7 +75,7 @@ impl Claims { pub fn is_expired(&self) -> bool { let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_secs(); self.exp < now } @@ -187,7 +187,7 @@ impl AuthService { pub async fn cleanup_revoked_tokens(&self) { let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_secs(); let mut revoked = self.revoked_tokens.write().await; diff --git a/src/cortex-app-server/src/config.rs b/src/cortex-app-server/src/config.rs index 35ac75b..b37389b 100644 --- a/src/cortex-app-server/src/config.rs +++ b/src/cortex-app-server/src/config.rs @@ -49,12 +49,18 @@ pub struct ServerConfig { pub max_body_size: usize, /// Request timeout in seconds (applies to full request lifecycle). + /// + /// See `cortex_common::http_client` module documentation for the complete + /// timeout hierarchy across Cortex services. #[serde(default = "default_request_timeout")] pub request_timeout: u64, /// Read timeout for individual chunks in seconds. /// Applies to chunked transfer encoding to prevent indefinite hangs /// when clients disconnect without sending the terminal chunk. + /// + /// See `cortex_common::http_client` module documentation for the complete + /// timeout hierarchy across Cortex services. #[serde(default = "default_read_timeout")] pub read_timeout: u64, @@ -71,12 +77,16 @@ pub struct ServerConfig { pub cors_origins: Vec, /// Graceful shutdown timeout in seconds. + /// + /// See `cortex_common::http_client` module documentation for the complete + /// timeout hierarchy across Cortex services. #[serde(default = "default_shutdown_timeout")] pub shutdown_timeout: u64, } fn default_shutdown_timeout() -> u64 { 30 // 30 seconds for graceful shutdown + // See cortex_common::http_client for timeout hierarchy documentation } fn default_listen_addr() -> String { diff --git a/src/cortex-app-server/src/storage.rs b/src/cortex-app-server/src/storage.rs index 6c5d44e..1aa617f 100644 --- a/src/cortex-app-server/src/storage.rs +++ b/src/cortex-app-server/src/storage.rs @@ -47,8 +47,6 @@ pub struct StoredToolCall { /// Session storage manager. pub struct SessionStorage { - #[allow(dead_code)] - base_dir: PathBuf, sessions_dir: PathBuf, history_dir: PathBuf, } @@ -66,7 +64,6 @@ impl SessionStorage { info!("Session storage initialized at {:?}", base_dir); Ok(Self { - base_dir, sessions_dir, history_dir, }) diff --git a/src/cortex-app-server/src/streaming.rs b/src/cortex-app-server/src/streaming.rs index 840b1a6..86300c8 100644 --- a/src/cortex-app-server/src/streaming.rs +++ b/src/cortex-app-server/src/streaming.rs @@ -510,10 +510,10 @@ async fn session_events_stream( serde_json::to_string(&StreamEvent::Ping { timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_secs(), }) - .unwrap(), + .unwrap_or_default(), ))) .await; diff --git a/src/cortex-apply-patch/src/hunk.rs b/src/cortex-apply-patch/src/hunk.rs index ea67a97..ab5b1f1 100644 --- a/src/cortex-apply-patch/src/hunk.rs +++ b/src/cortex-apply-patch/src/hunk.rs @@ -250,9 +250,6 @@ pub struct SearchReplace { pub search: String, /// The text to replace with. pub replace: String, - /// Replace all occurrences (true) or just the first (false). - #[allow(dead_code)] - pub replace_all: bool, } impl SearchReplace { @@ -266,16 +263,8 @@ impl SearchReplace { path: path.into(), search: search.into(), replace: replace.into(), - replace_all: false, } } - - /// Set whether to replace all occurrences. - #[allow(dead_code)] - pub fn with_replace_all(mut self, replace_all: bool) -> Self { - self.replace_all = replace_all; - self - } } #[cfg(test)] diff --git a/src/cortex-common/src/config_substitution.rs b/src/cortex-common/src/config_substitution.rs index 49bff00..4ecb861 100644 --- a/src/cortex-common/src/config_substitution.rs +++ b/src/cortex-common/src/config_substitution.rs @@ -8,8 +8,23 @@ use regex::Regex; use std::path::PathBuf; +use std::sync::LazyLock; use thiserror::Error; +/// Static regex for environment variable substitution: {env:VAR} or {env:VAR:default} +/// Group 1: variable name +/// Group 2: optional default value (after second colon) +static ENV_REGEX: LazyLock = LazyLock::new(|| { + Regex::new(r"\{env:([^:}]+)(?::([^}]*))?\}") + .expect("env regex pattern is valid and tested") +}); + +/// Static regex for file content substitution: {file:path} +/// Group 1: file path +static FILE_REGEX: LazyLock = LazyLock::new(|| { + Regex::new(r"\{file:([^}]+)\}").expect("file regex pattern is valid and tested") +}); + /// Errors that can occur during configuration substitution. #[derive(Debug, Error)] pub enum SubstitutionError { @@ -42,11 +57,13 @@ pub enum SubstitutionError { /// /// Handles replacement of `{env:...}` and `{file:...}` placeholders /// in configuration strings. +/// +/// This struct uses statically initialized regex patterns via `LazyLock`, +/// making regex compilation a one-time cost shared across all instances. pub struct ConfigSubstitution { - /// Regex for environment variable substitution: {env:VAR} or {env:VAR:default} - env_regex: Regex, - /// Regex for file content substitution: {file:path} - file_regex: Regex, + // This struct is kept for API compatibility. + // Regex patterns are now static module-level constants. + _private: (), } impl Default for ConfigSubstitution { @@ -56,22 +73,13 @@ impl Default for ConfigSubstitution { } impl ConfigSubstitution { - /// Creates a new `ConfigSubstitution` instance with compiled regex patterns. + /// Creates a new `ConfigSubstitution` instance. + /// + /// The regex patterns are statically initialized on first use, + /// so creating multiple instances has no additional cost. #[must_use] pub fn new() -> Self { - Self { - // Matches {env:VAR_NAME} or {env:VAR_NAME:default_value} - // Group 1: variable name - // Group 2: optional default value (after second colon) - env_regex: Regex::new(r"\{env:([^:}]+)(?::([^}]*))?\}").unwrap_or_else(|e| { - panic!("Failed to compile env regex: {e}"); - }), - // Matches {file:path} - // Group 1: file path - file_regex: Regex::new(r"\{file:([^}]+)\}").unwrap_or_else(|e| { - panic!("Failed to compile file regex: {e}"); - }), - } + Self { _private: () } } /// Substitutes all variables in a string. @@ -109,8 +117,7 @@ impl ConfigSubstitution { let mut error: Option = None; // Collect all matches first to avoid borrowing issues - let matches: Vec<_> = self - .env_regex + let matches: Vec<_> = ENV_REGEX .captures_iter(input) .map(|cap| { let full_match = cap.get(0).map(|m| m.as_str().to_string()); @@ -155,8 +162,7 @@ impl ConfigSubstitution { let mut error: Option = None; // Collect all matches first - let matches: Vec<_> = self - .file_regex + let matches: Vec<_> = FILE_REGEX .captures_iter(input) .map(|cap| { let full_match = cap.get(0).map(|m| m.as_str().to_string()); diff --git a/src/cortex-common/src/http_client.rs b/src/cortex-common/src/http_client.rs index b181ac8..3b290ff 100644 --- a/src/cortex-common/src/http_client.rs +++ b/src/cortex-common/src/http_client.rs @@ -9,6 +9,54 @@ //! //! DNS caching is configured with reasonable TTL to allow failover and load //! balancer updates (#2177). +//! +//! # Timeout Configuration Guide +//! +//! This section documents the timeout hierarchy across the Cortex codebase. Use this +//! as a reference when configuring timeouts for new features or debugging timeout issues. +//! +//! ## Timeout Hierarchy +//! +//! | Use Case | Timeout | Constant/Location | Rationale | +//! |-----------------------------|---------|--------------------------------------------|-----------------------------------------| +//! | Health checks | 5s | `HEALTH_CHECK_TIMEOUT` (this module) | Quick validation of service status | +//! | Standard HTTP requests | 30s | `DEFAULT_TIMEOUT` (this module) | Normal API calls with reasonable margin | +//! | Per-chunk read (streaming) | 30s | `read_timeout` (cortex-app-server/config) | Individual chunk timeout during stream | +//! | Pool idle timeout | 60s | `POOL_IDLE_TIMEOUT` (this module) | DNS re-resolution for failover | +//! | LLM Request (non-streaming) | 120s | `DEFAULT_REQUEST_TIMEOUT_SECS` (cortex-exec/runner) | Model inference takes time | +//! | LLM Streaming total | 300s | `STREAMING_TIMEOUT` (this module) | Long-running streaming responses | +//! | Server request lifecycle | 300s | `request_timeout` (cortex-app-server/config) | Full HTTP request/response cycle | +//! | Entire exec session | 600s | `DEFAULT_TIMEOUT_SECS` (cortex-exec/runner) | Multi-turn conversation limit | +//! | Graceful shutdown | 30s | `shutdown_timeout` (cortex-app-server/config) | Time for cleanup on shutdown | +//! +//! ## Module-Specific Timeouts +//! +//! ### cortex-common (this module) +//! - `DEFAULT_TIMEOUT` (30s): Use for standard API calls. +//! - `STREAMING_TIMEOUT` (300s): Use for LLM streaming endpoints. +//! - `HEALTH_CHECK_TIMEOUT` (5s): Use for health/readiness checks. +//! - `POOL_IDLE_TIMEOUT` (60s): Connection pool cleanup for DNS freshness. +//! +//! ### cortex-exec (runner.rs) +//! - `DEFAULT_TIMEOUT_SECS` (600s): Maximum duration for entire exec session. +//! - `DEFAULT_REQUEST_TIMEOUT_SECS` (120s): Single LLM request timeout. +//! +//! ### cortex-app-server (config.rs) +//! - `request_timeout` (300s): Full request lifecycle timeout. +//! - `read_timeout` (30s): Per-chunk timeout for streaming reads. +//! - `shutdown_timeout` (30s): Graceful shutdown duration. +//! +//! ### cortex-engine (api_client.rs) +//! - Re-exports constants from this module for consistency. +//! +//! ## Recommendations +//! +//! When adding new timeout configurations: +//! 1. Use constants from this module when possible for consistency. +//! 2. Document any new timeout constants with their rationale. +//! 3. Consider the timeout hierarchy - inner timeouts should be shorter than outer ones. +//! 4. For LLM operations, use longer timeouts (120s-300s) to accommodate model inference. +//! 5. For health checks and quick validations, use short timeouts (5s-10s). use reqwest::Client; use std::time::Duration; diff --git a/src/cortex-common/src/lib.rs b/src/cortex-common/src/lib.rs index 0f30c44..4cfc023 100644 --- a/src/cortex-common/src/lib.rs +++ b/src/cortex-common/src/lib.rs @@ -18,6 +18,7 @@ pub mod signal_safety; pub mod subprocess_env; pub mod subprocess_output; pub mod text_sanitize; +pub mod timeout; pub mod truncate; #[cfg(feature = "cli")] @@ -73,6 +74,11 @@ pub use subprocess_output::{ pub use text_sanitize::{ has_control_chars, normalize_code_fences, sanitize_control_chars, sanitize_for_terminal, }; +pub use timeout::{ + DEFAULT_BATCH_TIMEOUT_SECS, DEFAULT_EXEC_TIMEOUT_SECS, DEFAULT_HEALTH_CHECK_TIMEOUT_SECS, + DEFAULT_READ_TIMEOUT_SECS, DEFAULT_REQUEST_TIMEOUT_SECS, DEFAULT_SHUTDOWN_TIMEOUT_SECS, + DEFAULT_STREAMING_TIMEOUT_SECS, +}; pub use truncate::{ truncate_command, truncate_first_line, truncate_for_display, truncate_id, truncate_id_default, truncate_model_name, truncate_with_ellipsis, truncate_with_unicode_ellipsis, diff --git a/src/cortex-common/src/timeout.rs b/src/cortex-common/src/timeout.rs new file mode 100644 index 0000000..1f7ec37 --- /dev/null +++ b/src/cortex-common/src/timeout.rs @@ -0,0 +1,65 @@ +//! Centralized timeout constants for the Cortex CLI. +//! +//! This module provides consistent timeout values used throughout the codebase. +//! Centralizing these values ensures uniformity and makes it easier to adjust +//! timeouts across the application. + +/// Default timeout for the entire execution in seconds (10 minutes). +/// +/// This is the maximum time allowed for a complete headless execution, +/// including all LLM requests and tool executions. +pub const DEFAULT_EXEC_TIMEOUT_SECS: u64 = 600; + +/// Default timeout for a single LLM request in seconds (2 minutes). +/// +/// This is the maximum time to wait for a single completion request +/// to the LLM provider. +pub const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 120; + +/// Default timeout for streaming responses in seconds (5 minutes). +/// +/// Extended timeout for LLM streaming requests where responses are +/// delivered incrementally over time. +pub const DEFAULT_STREAMING_TIMEOUT_SECS: u64 = 300; + +/// Default timeout for health check requests in seconds (5 seconds). +/// +/// Short timeout used for quick health check endpoints. +pub const DEFAULT_HEALTH_CHECK_TIMEOUT_SECS: u64 = 5; + +/// Default timeout for graceful shutdown in seconds (30 seconds). +/// +/// Maximum time to wait for in-flight operations to complete during +/// shutdown before forcing termination. +pub const DEFAULT_SHUTDOWN_TIMEOUT_SECS: u64 = 30; + +/// Default timeout for batch execution in seconds (5 minutes). +/// +/// Maximum time allowed for executing a batch of parallel tool calls. +pub const DEFAULT_BATCH_TIMEOUT_SECS: u64 = 300; + +/// Default timeout for individual read operations in seconds (30 seconds). +/// +/// Timeout for individual read operations to prevent hangs when +/// Content-Length doesn't match actual body size. +pub const DEFAULT_READ_TIMEOUT_SECS: u64 = 30; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_timeout_values_are_reasonable() { + // Exec timeout should be greater than request timeout + assert!(DEFAULT_EXEC_TIMEOUT_SECS > DEFAULT_REQUEST_TIMEOUT_SECS); + + // Streaming timeout should be greater than request timeout + assert!(DEFAULT_STREAMING_TIMEOUT_SECS > DEFAULT_REQUEST_TIMEOUT_SECS); + + // Health check should be short + assert!(DEFAULT_HEALTH_CHECK_TIMEOUT_SECS <= 10); + + // Batch timeout should be reasonable + assert!(DEFAULT_BATCH_TIMEOUT_SECS >= 60); + } +} diff --git a/src/cortex-engine/src/tools/handlers/batch.rs b/src/cortex-engine/src/tools/handlers/batch.rs index 81ec81f..e3d4542 100644 --- a/src/cortex-engine/src/tools/handlers/batch.rs +++ b/src/cortex-engine/src/tools/handlers/batch.rs @@ -20,8 +20,9 @@ use crate::tools::spec::{ToolDefinition, ToolHandler, ToolResult}; /// Maximum number of tools that can be executed in a batch. pub const MAX_BATCH_SIZE: usize = 10; -/// Default timeout for batch execution in seconds. -pub const DEFAULT_BATCH_TIMEOUT_SECS: u64 = 300; +/// Default timeout for individual tool execution in seconds. +/// This prevents a single tool from consuming the entire batch timeout. +pub const DEFAULT_TOOL_TIMEOUT_SECS: u64 = 60; /// Tools that cannot be called within a batch (prevent recursion). /// Note: Task is now allowed to enable parallel task execution. @@ -45,6 +46,10 @@ pub struct BatchToolArgs { /// Optional timeout in seconds for the entire batch (default: 300s). #[serde(default)] pub timeout_secs: Option, + /// Optional timeout in seconds for individual tools (default: 60s). + /// This prevents a single tool from consuming the entire batch timeout. + #[serde(default)] + pub tool_timeout_secs: Option, } /// Result of a single tool call within the batch. @@ -158,7 +163,7 @@ impl BatchToolHandler { &self, calls: Vec, context: &ToolContext, - timeout_duration: Duration, + tool_timeout: Duration, ) -> BatchResult { let start_time = Instant::now(); let results = Arc::new(Mutex::new(Vec::with_capacity(calls.len()))); @@ -176,9 +181,9 @@ impl BatchToolHandler { async move { let call_start = Instant::now(); - // Execute with per-call timeout (use batch timeout for each call) + // Execute with per-tool timeout to prevent single tools from blocking others let execution_result = timeout( - timeout_duration, + tool_timeout, executor.execute_tool(&call.tool, call.arguments, &ctx), ) .await; @@ -202,7 +207,7 @@ impl BatchToolHandler { duration_ms, }, Ok(Err(e)) => BatchCallResult { - tool: tool_name, + tool: tool_name.clone(), index, success: false, result: None, @@ -210,11 +215,15 @@ impl BatchToolHandler { duration_ms, }, Err(_) => BatchCallResult { - tool: tool_name, + tool: tool_name.clone(), index, success: false, result: None, - error: Some(format!("Timeout after {}s", timeout_duration.as_secs())), + error: Some(format!( + "Tool '{}' timed out after {}s", + tool_name, + tool_timeout.as_secs() + )), duration_ms, }, }; @@ -328,13 +337,13 @@ impl ToolHandler for BatchToolHandler { // Validate calls self.validate_calls(&args.calls)?; - // Determine timeout - let timeout_secs = args.timeout_secs.unwrap_or(DEFAULT_BATCH_TIMEOUT_SECS); - let timeout_duration = Duration::from_secs(timeout_secs); + // Determine per-tool timeout (prevents single tool from blocking others) + let tool_timeout_secs = args.tool_timeout_secs.unwrap_or(DEFAULT_TOOL_TIMEOUT_SECS); + let tool_timeout = Duration::from_secs(tool_timeout_secs); // Execute all tools in parallel let batch_result = self - .execute_parallel(args.calls, context, timeout_duration) + .execute_parallel(args.calls, context, tool_timeout) .await; // Format output @@ -384,6 +393,12 @@ pub fn batch_tool_definition() -> ToolDefinition { "description": "Optional timeout in seconds for the entire batch execution (default: 300)", "minimum": 1, "maximum": 600 + }, + "tool_timeout_secs": { + "type": "integer", + "description": "Optional timeout in seconds for individual tool execution (default: 60). Prevents a single tool from consuming the entire batch timeout.", + "minimum": 1, + "maximum": 300 } } }), @@ -409,6 +424,7 @@ pub async fn execute_batch( }) .collect(), timeout_secs: None, + tool_timeout_secs: None, }; let arguments = serde_json::to_value(args) diff --git a/src/cortex-engine/src/tools/handlers/grep.rs b/src/cortex-engine/src/tools/handlers/grep.rs index 26d2561..ecef2d9 100644 --- a/src/cortex-engine/src/tools/handlers/grep.rs +++ b/src/cortex-engine/src/tools/handlers/grep.rs @@ -29,6 +29,7 @@ struct GrepArgs { glob_pattern: Option, #[serde(default = "default_output_mode")] output_mode: String, + #[serde(alias = "head_limit")] max_results: Option, #[serde(default)] multiline: bool, diff --git a/src/cortex-engine/src/tools/handlers/mod.rs b/src/cortex-engine/src/tools/handlers/mod.rs index ccfd955..8e7d3a1 100644 --- a/src/cortex-engine/src/tools/handlers/mod.rs +++ b/src/cortex-engine/src/tools/handlers/mod.rs @@ -23,9 +23,10 @@ mod web_search; pub use apply_patch::ApplyPatchHandler; pub use batch::{ BatchCallResult, BatchParams, BatchResult, BatchToolArgs, BatchToolCall, BatchToolExecutor, - BatchToolHandler, DEFAULT_BATCH_TIMEOUT_SECS, DISALLOWED_TOOLS, LegacyBatchToolCall, - MAX_BATCH_SIZE, batch_tool_definition, execute_batch, + BatchToolHandler, DISALLOWED_TOOLS, LegacyBatchToolCall, MAX_BATCH_SIZE, batch_tool_definition, + execute_batch, }; +pub use cortex_common::DEFAULT_BATCH_TIMEOUT_SECS; pub use create_agent::CreateAgentHandler; pub use edit_file::PatchHandler; diff --git a/src/cortex-engine/src/tools/unified_executor.rs b/src/cortex-engine/src/tools/unified_executor.rs index 985fefb..ed083a1 100644 --- a/src/cortex-engine/src/tools/unified_executor.rs +++ b/src/cortex-engine/src/tools/unified_executor.rs @@ -466,6 +466,7 @@ impl UnifiedToolExecutor { Ok(BatchToolArgs { calls, timeout_secs: arguments.get("timeout_secs").and_then(|v| v.as_u64()), + tool_timeout_secs: arguments.get("tool_timeout_secs").and_then(|v| v.as_u64()), }) } else { Err(CortexError::InvalidInput( diff --git a/src/cortex-exec/Cargo.toml b/src/cortex-exec/Cargo.toml index 2693c7c..de3b312 100644 --- a/src/cortex-exec/Cargo.toml +++ b/src/cortex-exec/Cargo.toml @@ -13,6 +13,7 @@ path = "src/lib.rs" workspace = true [dependencies] +cortex-common = { workspace = true } cortex-engine = { workspace = true } cortex-protocol = { workspace = true } tokio = { workspace = true, features = ["full"] } diff --git a/src/cortex-exec/src/runner.rs b/src/cortex-exec/src/runner.rs index e831324..5f24f1d 100644 --- a/src/cortex-exec/src/runner.rs +++ b/src/cortex-exec/src/runner.rs @@ -27,9 +27,17 @@ use cortex_protocol::ConversationId; use crate::output::{OutputFormat, OutputWriter}; /// Default timeout for the entire execution (10 minutes). +/// +/// This is the maximum duration for a multi-turn exec session. +/// See `cortex_common::http_client` module documentation for the complete +/// timeout hierarchy across Cortex services. const DEFAULT_TIMEOUT_SECS: u64 = 600; /// Default timeout for a single LLM request (2 minutes). +/// +/// Allows sufficient time for model inference while preventing indefinite hangs. +/// See `cortex_common::http_client` module documentation for the complete +/// timeout hierarchy across Cortex services. const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 120; /// Maximum retries for transient errors. diff --git a/src/cortex-mcp-client/src/transport.rs b/src/cortex-mcp-client/src/transport.rs index 22152cf..0ee141d 100644 --- a/src/cortex-mcp-client/src/transport.rs +++ b/src/cortex-mcp-client/src/transport.rs @@ -20,8 +20,7 @@ use cortex_mcp_types::{ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, Command}; use tokio::sync::{Mutex, RwLock}; -use tokio::time::sleep; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; // ============================================================================ // Transport Trait @@ -199,61 +198,6 @@ impl StdioTransport { Ok(()) } - /// Reconnect with exponential backoff. - /// - /// Properly cleans up existing connections before each attempt to prevent - /// file descriptor leaks (#2198). - #[allow(dead_code)] - async fn reconnect(&self) -> Result<()> { - if !self.reconnect_config.enabled { - return Err(anyhow!("Reconnection disabled")); - } - - let mut attempt = 0; - let mut delay = self.reconnect_config.initial_delay; - - while attempt < self.reconnect_config.max_attempts { - attempt += 1; - info!( - attempt, - max = self.reconnect_config.max_attempts, - "Attempting reconnection" - ); - - // Clean up any existing connection before attempting reconnect - // This prevents file descriptor leaks on repeated failures (#2198) - { - let mut process_guard = self.process.lock().await; - if let Some(mut child) = process_guard.take() { - // Kill the process and wait for it to clean up - let _ = child.kill().await; - // Wait a short time for resources to be released - drop(child); - } - self.connected.store(false, Ordering::SeqCst); - } - - // Clear any stale pending responses - self.pending_responses.write().await.clear(); - - match self.connect().await { - Ok(()) => { - info!("Reconnection successful"); - return Ok(()); - } - Err(e) => { - error!(error = %e, attempt, "Reconnection failed"); - if attempt < self.reconnect_config.max_attempts { - sleep(delay).await; - delay = (delay * 2).min(self.reconnect_config.max_delay); - } - } - } - } - - Err(anyhow!("Failed to reconnect after {} attempts", attempt)) - } - /// Send a request and wait for response. async fn send_request(&self, request: JsonRpcRequest) -> Result { // Ensure connected @@ -516,51 +460,6 @@ impl HttpTransport { fn next_request_id(&self) -> RequestId { RequestId::Number(self.request_id.fetch_add(1, Ordering::SeqCst) as i64) } - - /// Test connection. - #[allow(dead_code)] - async fn test_connection(&self) -> Result<()> { - let request = JsonRpcRequest::new(self.next_request_id(), methods::PING); - self.send_request(request).await?; - Ok(()) - } - - /// Reconnect with exponential backoff. - #[allow(dead_code)] - async fn reconnect(&self) -> Result<()> { - if !self.reconnect_config.enabled { - return Err(anyhow!("Reconnection disabled")); - } - - let mut attempt = 0; - let mut delay = self.reconnect_config.initial_delay; - - while attempt < self.reconnect_config.max_attempts { - attempt += 1; - info!( - attempt, - max = self.reconnect_config.max_attempts, - "Attempting HTTP reconnection" - ); - - match self.test_connection().await { - Ok(()) => { - info!("HTTP reconnection successful"); - self.connected.store(true, Ordering::SeqCst); - return Ok(()); - } - Err(e) => { - error!(error = %e, attempt, "HTTP reconnection failed"); - if attempt < self.reconnect_config.max_attempts { - sleep(delay).await; - delay = (delay * 2).min(self.reconnect_config.max_delay); - } - } - } - } - - Err(anyhow!("Failed to reconnect after {} attempts", attempt)) - } } #[async_trait] diff --git a/src/cortex-tui/src/runner/event_loop/streaming.rs b/src/cortex-tui/src/runner/event_loop/streaming.rs index 159d5d9..e1037e7 100644 --- a/src/cortex-tui/src/runner/event_loop/streaming.rs +++ b/src/cortex-tui/src/runner/event_loop/streaming.rs @@ -20,6 +20,14 @@ use cortex_engine::streaming::StreamEvent; use super::core::{EventLoop, PendingToolCall, simplify_error_message}; +/// Initial connection timeout for streaming requests. +/// See cortex_common::http_client for timeout hierarchy documentation. +const STREAMING_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60); + +/// Per-chunk timeout during streaming responses. +/// See cortex_common::http_client for timeout hierarchy documentation. +const STREAMING_CHUNK_TIMEOUT: Duration = Duration::from_secs(30); + impl EventLoop { /// Handles message submission using the new provider system. /// @@ -176,7 +184,7 @@ impl EventLoop { // Start the completion request with timeout let stream_result = tokio::time::timeout( - Duration::from_secs(60), // 60 second timeout for initial connection + STREAMING_CONNECTION_TIMEOUT, client.complete(request), ) .await; @@ -215,7 +223,7 @@ impl EventLoop { // Wait for next event with timeout let event = tokio::time::timeout( - Duration::from_secs(30), // 30 second timeout between chunks + STREAMING_CHUNK_TIMEOUT, stream.next(), ) .await; @@ -738,7 +746,7 @@ impl EventLoop { }; let stream_result = - tokio::time::timeout(Duration::from_secs(60), client.complete(request)).await; + tokio::time::timeout(STREAMING_CONNECTION_TIMEOUT, client.complete(request)).await; let mut stream = match stream_result { Ok(Ok(s)) => s, @@ -770,7 +778,7 @@ impl EventLoop { break; } - let event = tokio::time::timeout(Duration::from_secs(30), stream.next()).await; + let event = tokio::time::timeout(STREAMING_CHUNK_TIMEOUT, stream.next()).await; match event { Ok(Some(Ok(ResponseEvent::Delta(delta)))) => { diff --git a/src/cortex-tui/src/runner/event_loop/subagent.rs b/src/cortex-tui/src/runner/event_loop/subagent.rs index 9283ed7..cf9314f 100644 --- a/src/cortex-tui/src/runner/event_loop/subagent.rs +++ b/src/cortex-tui/src/runner/event_loop/subagent.rs @@ -2,6 +2,14 @@ use std::time::{Duration, Instant}; +/// Connection timeout for subagent streaming requests. +/// Higher than main streaming to allow for subagent initialization. +const SUBAGENT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(120); + +/// Per-event timeout during subagent responses. +/// Higher than main streaming to account for longer tool executions. +const SUBAGENT_EVENT_TIMEOUT: Duration = Duration::from_secs(60); + use crate::app::SubagentTaskDisplay; use crate::events::{SubagentEvent, ToolEvent}; use crate::session::StoredToolCall; @@ -210,7 +218,7 @@ impl EventLoop { }; let stream_result = - tokio::time::timeout(Duration::from_secs(120), client.complete(request)).await; + tokio::time::timeout(SUBAGENT_CONNECTION_TIMEOUT, client.complete(request)).await; let mut stream = match stream_result { Ok(Ok(s)) => s, @@ -252,7 +260,7 @@ impl EventLoop { let mut iteration_tool_calls: Vec<(String, String, serde_json::Value)> = Vec::new(); loop { - let event = tokio::time::timeout(Duration::from_secs(60), stream.next()).await; + let event = tokio::time::timeout(SUBAGENT_EVENT_TIMEOUT, stream.next()).await; match event { Ok(Some(Ok(ResponseEvent::Delta(delta)))) => { From a441807324e6a77693facbfb497395fa708e29dc Mon Sep 17 00:00:00 2001 From: echobt Date: Wed, 4 Feb 2026 16:11:27 +0000 Subject: [PATCH 2/3] fix: apply rustfmt formatting --- src/cortex-app-server/src/config.rs | 2 +- src/cortex-common/src/config_substitution.rs | 3 +-- src/cortex-tui/src/runner/event_loop/streaming.rs | 13 +++---------- src/cortex-tui/src/runner/event_loop/subagent.rs | 3 ++- 4 files changed, 7 insertions(+), 14 deletions(-) diff --git a/src/cortex-app-server/src/config.rs b/src/cortex-app-server/src/config.rs index b37389b..92be050 100644 --- a/src/cortex-app-server/src/config.rs +++ b/src/cortex-app-server/src/config.rs @@ -86,7 +86,7 @@ pub struct ServerConfig { fn default_shutdown_timeout() -> u64 { 30 // 30 seconds for graceful shutdown - // See cortex_common::http_client for timeout hierarchy documentation + // See cortex_common::http_client for timeout hierarchy documentation } fn default_listen_addr() -> String { diff --git a/src/cortex-common/src/config_substitution.rs b/src/cortex-common/src/config_substitution.rs index 4ecb861..ed8c6b5 100644 --- a/src/cortex-common/src/config_substitution.rs +++ b/src/cortex-common/src/config_substitution.rs @@ -15,8 +15,7 @@ use thiserror::Error; /// Group 1: variable name /// Group 2: optional default value (after second colon) static ENV_REGEX: LazyLock = LazyLock::new(|| { - Regex::new(r"\{env:([^:}]+)(?::([^}]*))?\}") - .expect("env regex pattern is valid and tested") + Regex::new(r"\{env:([^:}]+)(?::([^}]*))?\}").expect("env regex pattern is valid and tested") }); /// Static regex for file content substitution: {file:path} diff --git a/src/cortex-tui/src/runner/event_loop/streaming.rs b/src/cortex-tui/src/runner/event_loop/streaming.rs index e1037e7..1539687 100644 --- a/src/cortex-tui/src/runner/event_loop/streaming.rs +++ b/src/cortex-tui/src/runner/event_loop/streaming.rs @@ -183,11 +183,8 @@ impl EventLoop { }; // Start the completion request with timeout - let stream_result = tokio::time::timeout( - STREAMING_CONNECTION_TIMEOUT, - client.complete(request), - ) - .await; + let stream_result = + tokio::time::timeout(STREAMING_CONNECTION_TIMEOUT, client.complete(request)).await; let mut stream = match stream_result { Ok(Ok(s)) => s, @@ -222,11 +219,7 @@ impl EventLoop { } // Wait for next event with timeout - let event = tokio::time::timeout( - STREAMING_CHUNK_TIMEOUT, - stream.next(), - ) - .await; + let event = tokio::time::timeout(STREAMING_CHUNK_TIMEOUT, stream.next()).await; match event { Ok(Some(Ok(ResponseEvent::Delta(delta)))) => { diff --git a/src/cortex-tui/src/runner/event_loop/subagent.rs b/src/cortex-tui/src/runner/event_loop/subagent.rs index cf9314f..a0b0c35 100644 --- a/src/cortex-tui/src/runner/event_loop/subagent.rs +++ b/src/cortex-tui/src/runner/event_loop/subagent.rs @@ -218,7 +218,8 @@ impl EventLoop { }; let stream_result = - tokio::time::timeout(SUBAGENT_CONNECTION_TIMEOUT, client.complete(request)).await; + tokio::time::timeout(SUBAGENT_CONNECTION_TIMEOUT, client.complete(request)) + .await; let mut stream = match stream_result { Ok(Ok(s)) => s, From 4db1a2582f9d7bf8ec1057ede2f0e629b61b2ae4 Mon Sep 17 00:00:00 2001 From: echobt Date: Wed, 4 Feb 2026 16:25:37 +0000 Subject: [PATCH 3/3] fix: allow assertions_on_constants for compile-time test assertions --- src/cortex-common/src/timeout.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cortex-common/src/timeout.rs b/src/cortex-common/src/timeout.rs index 1f7ec37..95143c8 100644 --- a/src/cortex-common/src/timeout.rs +++ b/src/cortex-common/src/timeout.rs @@ -49,6 +49,7 @@ mod tests { use super::*; #[test] + #[allow(clippy::assertions_on_constants)] fn test_timeout_values_are_reasonable() { // Exec timeout should be greater than request timeout assert!(DEFAULT_EXEC_TIMEOUT_SECS > DEFAULT_REQUEST_TIMEOUT_SECS);