From b6e1ccfdcd54a4e075e91a40360eb297ad380836 Mon Sep 17 00:00:00 2001 From: echobt Date: Wed, 4 Feb 2026 17:02:30 +0000 Subject: [PATCH 1/2] chore: consolidated refactoring and documentation improvements This PR consolidates all refactoring and documentation improvements from PRs #77 and #89. ## Refactoring (from PR #77) ### Timeout Constants - Created centralized cortex_common::timeout module with documented constants - Replaced scattered magic numbers with well-documented constants - Added comprehensive documentation for timeout hierarchy ### Code Quality - Used LazyLock for static regex initialization - Extracted subagent timeout values as named constants - Improved compile-time assertions ## Documentation (from PR #89) ### Timeout Hierarchy Documentation - Added detailed header documentation explaining timeout hierarchy - Documented each timeout constant with its purpose and use case - Established naming conventions for constants vs config fields - Added timeout hierarchy table for quick reference ### Timeout Reference Table | Use Case | Module | Constant | Value | Rationale | |-----------------------------|-----------------------------|-----------------------------|-------|-----------| | Health checks | cortex-common/http_client | HEALTH_CHECK_TIMEOUT | 5s | Quick validation | | Standard HTTP requests | cortex-common/http_client | DEFAULT_TIMEOUT | 30s | Normal API calls | | Connection pool idle | cortex-common/http_client | POOL_IDLE_TIMEOUT | 60s | DNS refresh | | LLM Request (non-streaming) | cortex-exec/runner | DEFAULT_REQUEST_TIMEOUT_SECS| 120s | Model inference | | LLM Streaming total | cortex-common/http_client | STREAMING_TIMEOUT | 300s | Long-running streams | ## Files Modified - src/cortex-common/src/timeout.rs (new) - src/cortex-common/src/http_client.rs - src/cortex-app-server/src/config.rs - src/cortex-exec/src/runner.rs - Multiple other files Closes #77, #89 --- Cargo.lock | 1 + src/cortex-app-server/src/auth.rs | 6 +- src/cortex-app-server/src/config.rs | 13 ++- src/cortex-app-server/src/storage.rs | 3 - src/cortex-app-server/src/streaming.rs | 4 +- src/cortex-apply-patch/src/hunk.rs | 11 -- src/cortex-common/src/config_substitution.rs | 49 +++++---- src/cortex-common/src/http_client.rs | 84 +++++++++++++- src/cortex-common/src/lib.rs | 6 + src/cortex-common/src/timeout.rs | 66 +++++++++++ src/cortex-engine/src/tools/handlers/batch.rs | 40 +++++-- src/cortex-engine/src/tools/handlers/grep.rs | 1 + src/cortex-engine/src/tools/handlers/mod.rs | 5 +- .../src/tools/unified_executor.rs | 1 + src/cortex-exec/Cargo.toml | 1 + src/cortex-exec/src/runner.rs | 8 ++ src/cortex-mcp-client/src/transport.rs | 103 +----------------- .../src/runner/event_loop/streaming.rs | 25 +++-- .../src/runner/event_loop/subagent.rs | 13 ++- 19 files changed, 263 insertions(+), 177 deletions(-) create mode 100644 src/cortex-common/src/timeout.rs diff --git a/Cargo.lock b/Cargo.lock index de5594c..f2363d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1204,6 +1204,7 @@ dependencies = [ name = "cortex-exec" version = "0.0.6" dependencies = [ + "cortex-common", "cortex-engine", "cortex-protocol", "serde_json", diff --git a/src/cortex-app-server/src/auth.rs b/src/cortex-app-server/src/auth.rs index 414f36f..4f240c3 100644 --- a/src/cortex-app-server/src/auth.rs +++ b/src/cortex-app-server/src/auth.rs @@ -45,7 +45,7 @@ impl Claims { pub fn new(user_id: impl Into, expiry_seconds: u64) -> Self { let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_secs(); Self { @@ -75,7 +75,7 @@ impl Claims { pub fn is_expired(&self) -> bool { let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_secs(); self.exp < now } @@ -187,7 +187,7 @@ impl AuthService { pub async fn cleanup_revoked_tokens(&self) { let now = SystemTime::now() .duration_since(UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_secs(); let mut revoked = self.revoked_tokens.write().await; diff --git a/src/cortex-app-server/src/config.rs b/src/cortex-app-server/src/config.rs index 35ac75b..91ad832 100644 --- a/src/cortex-app-server/src/config.rs +++ b/src/cortex-app-server/src/config.rs @@ -49,12 +49,18 @@ pub struct ServerConfig { pub max_body_size: usize, /// Request timeout in seconds (applies to full request lifecycle). + /// + /// See `cortex_common::http_client` module documentation for the complete + /// timeout hierarchy across Cortex services. #[serde(default = "default_request_timeout")] pub request_timeout: u64, /// Read timeout for individual chunks in seconds. /// Applies to chunked transfer encoding to prevent indefinite hangs /// when clients disconnect without sending the terminal chunk. + /// + /// See `cortex_common::http_client` module documentation for the complete + /// timeout hierarchy across Cortex services. #[serde(default = "default_read_timeout")] pub read_timeout: u64, @@ -71,12 +77,17 @@ pub struct ServerConfig { pub cors_origins: Vec, /// Graceful shutdown timeout in seconds. + /// + /// See `cortex_common::http_client` module documentation for the complete + /// timeout hierarchy across Cortex services. #[serde(default = "default_shutdown_timeout")] pub shutdown_timeout: u64, } fn default_shutdown_timeout() -> u64 { - 30 // 30 seconds for graceful shutdown + // 30 seconds for graceful shutdown + // See cortex_common::http_client for timeout hierarchy documentation + 30 } fn default_listen_addr() -> String { diff --git a/src/cortex-app-server/src/storage.rs b/src/cortex-app-server/src/storage.rs index 6c5d44e..1aa617f 100644 --- a/src/cortex-app-server/src/storage.rs +++ b/src/cortex-app-server/src/storage.rs @@ -47,8 +47,6 @@ pub struct StoredToolCall { /// Session storage manager. pub struct SessionStorage { - #[allow(dead_code)] - base_dir: PathBuf, sessions_dir: PathBuf, history_dir: PathBuf, } @@ -66,7 +64,6 @@ impl SessionStorage { info!("Session storage initialized at {:?}", base_dir); Ok(Self { - base_dir, sessions_dir, history_dir, }) diff --git a/src/cortex-app-server/src/streaming.rs b/src/cortex-app-server/src/streaming.rs index 840b1a6..86300c8 100644 --- a/src/cortex-app-server/src/streaming.rs +++ b/src/cortex-app-server/src/streaming.rs @@ -510,10 +510,10 @@ async fn session_events_stream( serde_json::to_string(&StreamEvent::Ping { timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .unwrap() + .unwrap_or_default() .as_secs(), }) - .unwrap(), + .unwrap_or_default(), ))) .await; diff --git a/src/cortex-apply-patch/src/hunk.rs b/src/cortex-apply-patch/src/hunk.rs index ea67a97..ab5b1f1 100644 --- a/src/cortex-apply-patch/src/hunk.rs +++ b/src/cortex-apply-patch/src/hunk.rs @@ -250,9 +250,6 @@ pub struct SearchReplace { pub search: String, /// The text to replace with. pub replace: String, - /// Replace all occurrences (true) or just the first (false). - #[allow(dead_code)] - pub replace_all: bool, } impl SearchReplace { @@ -266,16 +263,8 @@ impl SearchReplace { path: path.into(), search: search.into(), replace: replace.into(), - replace_all: false, } } - - /// Set whether to replace all occurrences. - #[allow(dead_code)] - pub fn with_replace_all(mut self, replace_all: bool) -> Self { - self.replace_all = replace_all; - self - } } #[cfg(test)] diff --git a/src/cortex-common/src/config_substitution.rs b/src/cortex-common/src/config_substitution.rs index 49bff00..ed8c6b5 100644 --- a/src/cortex-common/src/config_substitution.rs +++ b/src/cortex-common/src/config_substitution.rs @@ -8,8 +8,22 @@ use regex::Regex; use std::path::PathBuf; +use std::sync::LazyLock; use thiserror::Error; +/// Static regex for environment variable substitution: {env:VAR} or {env:VAR:default} +/// Group 1: variable name +/// Group 2: optional default value (after second colon) +static ENV_REGEX: LazyLock = LazyLock::new(|| { + Regex::new(r"\{env:([^:}]+)(?::([^}]*))?\}").expect("env regex pattern is valid and tested") +}); + +/// Static regex for file content substitution: {file:path} +/// Group 1: file path +static FILE_REGEX: LazyLock = LazyLock::new(|| { + Regex::new(r"\{file:([^}]+)\}").expect("file regex pattern is valid and tested") +}); + /// Errors that can occur during configuration substitution. #[derive(Debug, Error)] pub enum SubstitutionError { @@ -42,11 +56,13 @@ pub enum SubstitutionError { /// /// Handles replacement of `{env:...}` and `{file:...}` placeholders /// in configuration strings. +/// +/// This struct uses statically initialized regex patterns via `LazyLock`, +/// making regex compilation a one-time cost shared across all instances. pub struct ConfigSubstitution { - /// Regex for environment variable substitution: {env:VAR} or {env:VAR:default} - env_regex: Regex, - /// Regex for file content substitution: {file:path} - file_regex: Regex, + // This struct is kept for API compatibility. + // Regex patterns are now static module-level constants. + _private: (), } impl Default for ConfigSubstitution { @@ -56,22 +72,13 @@ impl Default for ConfigSubstitution { } impl ConfigSubstitution { - /// Creates a new `ConfigSubstitution` instance with compiled regex patterns. + /// Creates a new `ConfigSubstitution` instance. + /// + /// The regex patterns are statically initialized on first use, + /// so creating multiple instances has no additional cost. #[must_use] pub fn new() -> Self { - Self { - // Matches {env:VAR_NAME} or {env:VAR_NAME:default_value} - // Group 1: variable name - // Group 2: optional default value (after second colon) - env_regex: Regex::new(r"\{env:([^:}]+)(?::([^}]*))?\}").unwrap_or_else(|e| { - panic!("Failed to compile env regex: {e}"); - }), - // Matches {file:path} - // Group 1: file path - file_regex: Regex::new(r"\{file:([^}]+)\}").unwrap_or_else(|e| { - panic!("Failed to compile file regex: {e}"); - }), - } + Self { _private: () } } /// Substitutes all variables in a string. @@ -109,8 +116,7 @@ impl ConfigSubstitution { let mut error: Option = None; // Collect all matches first to avoid borrowing issues - let matches: Vec<_> = self - .env_regex + let matches: Vec<_> = ENV_REGEX .captures_iter(input) .map(|cap| { let full_match = cap.get(0).map(|m| m.as_str().to_string()); @@ -155,8 +161,7 @@ impl ConfigSubstitution { let mut error: Option = None; // Collect all matches first - let matches: Vec<_> = self - .file_regex + let matches: Vec<_> = FILE_REGEX .captures_iter(input) .map(|cap| { let full_match = cap.get(0).map(|m| m.as_str().to_string()); diff --git a/src/cortex-common/src/http_client.rs b/src/cortex-common/src/http_client.rs index b181ac8..18f1c1d 100644 --- a/src/cortex-common/src/http_client.rs +++ b/src/cortex-common/src/http_client.rs @@ -9,25 +9,99 @@ //! //! DNS caching is configured with reasonable TTL to allow failover and load //! balancer updates (#2177). +//! +//! # Timeout Configuration Guide +//! +//! This section documents the timeout hierarchy across the Cortex codebase. Use this +//! as a reference when configuring timeouts for new features or debugging timeout issues. +//! +//! ## Timeout Hierarchy +//! +//! | Use Case | Timeout | Constant/Location | Rationale | +//! |-----------------------------|---------|--------------------------------------------|-----------------------------------------| +//! | Health checks | 5s | `HEALTH_CHECK_TIMEOUT` (this module) | Quick validation of service status | +//! | Standard HTTP requests | 30s | `DEFAULT_TIMEOUT` (this module) | Normal API calls with reasonable margin | +//! | Per-chunk read (streaming) | 30s | `read_timeout` (cortex-app-server/config) | Individual chunk timeout during stream | +//! | Pool idle timeout | 60s | `POOL_IDLE_TIMEOUT` (this module) | DNS re-resolution for failover | +//! | LLM Request (non-streaming) | 120s | `DEFAULT_REQUEST_TIMEOUT_SECS` (cortex-exec/runner) | Model inference takes time | +//! | LLM Streaming total | 300s | `STREAMING_TIMEOUT` (this module) | Long-running streaming responses | +//! | Server request lifecycle | 300s | `request_timeout` (cortex-app-server/config) | Full HTTP request/response cycle | +//! | Entire exec session | 600s | `DEFAULT_TIMEOUT_SECS` (cortex-exec/runner) | Multi-turn conversation limit | +//! | Graceful shutdown | 30s | `shutdown_timeout` (cortex-app-server/config) | Time for cleanup on shutdown | +//! +//! ## Module-Specific Timeouts +//! +//! ### cortex-common (this module) +//! - `DEFAULT_TIMEOUT` (30s): Use for standard API calls. +//! - `STREAMING_TIMEOUT` (300s): Use for LLM streaming endpoints. +//! - `HEALTH_CHECK_TIMEOUT` (5s): Use for health/readiness checks. +//! - `POOL_IDLE_TIMEOUT` (60s): Connection pool cleanup for DNS freshness. +//! +//! ### cortex-exec (runner.rs) +//! - `DEFAULT_TIMEOUT_SECS` (600s): Maximum duration for entire exec session. +//! - `DEFAULT_REQUEST_TIMEOUT_SECS` (120s): Single LLM request timeout. +//! +//! ### cortex-app-server (config.rs) +//! - `request_timeout` (300s): Full request lifecycle timeout. +//! - `read_timeout` (30s): Per-chunk timeout for streaming reads. +//! - `shutdown_timeout` (30s): Graceful shutdown duration. +//! +//! ### cortex-engine (api_client.rs) +//! - Re-exports constants from this module for consistency. +//! +//! ## Recommendations +//! +//! When adding new timeout configurations: +//! 1. Use constants from this module when possible for consistency. +//! 2. Document any new timeout constants with their rationale. +//! 3. Consider the timeout hierarchy - inner timeouts should be shorter than outer ones. +//! 4. For LLM operations, use longer timeouts (120s-300s) to accommodate model inference. +//! 5. For health checks and quick validations, use short timeouts (5s-10s). use reqwest::Client; use std::time::Duration; +// ============================================================ +// Timeout Configuration Constants +// ============================================================ +// +// Timeout Hierarchy Documentation +// =============================== +// +// This module defines the authoritative timeout constants for HTTP operations. +// Other modules should reference these constants or document deviations. +// +// Timeout Categories: +// - Request timeouts: Total time for a complete request/response cycle +// - Connection timeouts: Time to establish TCP connection +// - Read timeouts: Time to receive response data +// - Pool timeouts: How long idle connections stay in pool +// +// Recommended Naming Convention: +// - Constants: SCREAMING_SNAKE_CASE with Duration type +// - Config fields: snake_case with _secs suffix for u64 values +// +// ============================================================ + /// User-Agent string for all HTTP requests pub const USER_AGENT: &str = concat!("cortex-cli/", env!("CARGO_PKG_VERSION")); -/// Default timeout for standard API requests (30 seconds) +/// Default timeout for standard HTTP requests (30 seconds). +/// Used for non-streaming API calls, health checks with retries, etc. pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); -/// Extended timeout for LLM streaming requests (5 minutes) +/// Timeout for streaming HTTP requests (5 minutes). +/// Longer duration to accommodate LLM inference time. pub const STREAMING_TIMEOUT: Duration = Duration::from_secs(300); -/// Short timeout for health checks (5 seconds) +/// Timeout for health check requests (5 seconds). +/// Short timeout since health endpoints should respond quickly. pub const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(5); -/// Connection pool idle timeout to ensure DNS is re-resolved periodically. +/// Idle timeout for connection pool (60 seconds). +/// Connections are closed after being idle for this duration +/// to allow DNS re-resolution for services behind load balancers. /// This helps with failover scenarios where DNS records change (#2177). -/// Set to 60 seconds to balance between performance and DNS freshness. pub const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(60); /// Creates an HTTP client with default configuration (30s timeout). diff --git a/src/cortex-common/src/lib.rs b/src/cortex-common/src/lib.rs index 0f30c44..4cfc023 100644 --- a/src/cortex-common/src/lib.rs +++ b/src/cortex-common/src/lib.rs @@ -18,6 +18,7 @@ pub mod signal_safety; pub mod subprocess_env; pub mod subprocess_output; pub mod text_sanitize; +pub mod timeout; pub mod truncate; #[cfg(feature = "cli")] @@ -73,6 +74,11 @@ pub use subprocess_output::{ pub use text_sanitize::{ has_control_chars, normalize_code_fences, sanitize_control_chars, sanitize_for_terminal, }; +pub use timeout::{ + DEFAULT_BATCH_TIMEOUT_SECS, DEFAULT_EXEC_TIMEOUT_SECS, DEFAULT_HEALTH_CHECK_TIMEOUT_SECS, + DEFAULT_READ_TIMEOUT_SECS, DEFAULT_REQUEST_TIMEOUT_SECS, DEFAULT_SHUTDOWN_TIMEOUT_SECS, + DEFAULT_STREAMING_TIMEOUT_SECS, +}; pub use truncate::{ truncate_command, truncate_first_line, truncate_for_display, truncate_id, truncate_id_default, truncate_model_name, truncate_with_ellipsis, truncate_with_unicode_ellipsis, diff --git a/src/cortex-common/src/timeout.rs b/src/cortex-common/src/timeout.rs new file mode 100644 index 0000000..95143c8 --- /dev/null +++ b/src/cortex-common/src/timeout.rs @@ -0,0 +1,66 @@ +//! Centralized timeout constants for the Cortex CLI. +//! +//! This module provides consistent timeout values used throughout the codebase. +//! Centralizing these values ensures uniformity and makes it easier to adjust +//! timeouts across the application. + +/// Default timeout for the entire execution in seconds (10 minutes). +/// +/// This is the maximum time allowed for a complete headless execution, +/// including all LLM requests and tool executions. +pub const DEFAULT_EXEC_TIMEOUT_SECS: u64 = 600; + +/// Default timeout for a single LLM request in seconds (2 minutes). +/// +/// This is the maximum time to wait for a single completion request +/// to the LLM provider. +pub const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 120; + +/// Default timeout for streaming responses in seconds (5 minutes). +/// +/// Extended timeout for LLM streaming requests where responses are +/// delivered incrementally over time. +pub const DEFAULT_STREAMING_TIMEOUT_SECS: u64 = 300; + +/// Default timeout for health check requests in seconds (5 seconds). +/// +/// Short timeout used for quick health check endpoints. +pub const DEFAULT_HEALTH_CHECK_TIMEOUT_SECS: u64 = 5; + +/// Default timeout for graceful shutdown in seconds (30 seconds). +/// +/// Maximum time to wait for in-flight operations to complete during +/// shutdown before forcing termination. +pub const DEFAULT_SHUTDOWN_TIMEOUT_SECS: u64 = 30; + +/// Default timeout for batch execution in seconds (5 minutes). +/// +/// Maximum time allowed for executing a batch of parallel tool calls. +pub const DEFAULT_BATCH_TIMEOUT_SECS: u64 = 300; + +/// Default timeout for individual read operations in seconds (30 seconds). +/// +/// Timeout for individual read operations to prevent hangs when +/// Content-Length doesn't match actual body size. +pub const DEFAULT_READ_TIMEOUT_SECS: u64 = 30; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + #[allow(clippy::assertions_on_constants)] + fn test_timeout_values_are_reasonable() { + // Exec timeout should be greater than request timeout + assert!(DEFAULT_EXEC_TIMEOUT_SECS > DEFAULT_REQUEST_TIMEOUT_SECS); + + // Streaming timeout should be greater than request timeout + assert!(DEFAULT_STREAMING_TIMEOUT_SECS > DEFAULT_REQUEST_TIMEOUT_SECS); + + // Health check should be short + assert!(DEFAULT_HEALTH_CHECK_TIMEOUT_SECS <= 10); + + // Batch timeout should be reasonable + assert!(DEFAULT_BATCH_TIMEOUT_SECS >= 60); + } +} diff --git a/src/cortex-engine/src/tools/handlers/batch.rs b/src/cortex-engine/src/tools/handlers/batch.rs index 81ec81f..e3d4542 100644 --- a/src/cortex-engine/src/tools/handlers/batch.rs +++ b/src/cortex-engine/src/tools/handlers/batch.rs @@ -20,8 +20,9 @@ use crate::tools::spec::{ToolDefinition, ToolHandler, ToolResult}; /// Maximum number of tools that can be executed in a batch. pub const MAX_BATCH_SIZE: usize = 10; -/// Default timeout for batch execution in seconds. -pub const DEFAULT_BATCH_TIMEOUT_SECS: u64 = 300; +/// Default timeout for individual tool execution in seconds. +/// This prevents a single tool from consuming the entire batch timeout. +pub const DEFAULT_TOOL_TIMEOUT_SECS: u64 = 60; /// Tools that cannot be called within a batch (prevent recursion). /// Note: Task is now allowed to enable parallel task execution. @@ -45,6 +46,10 @@ pub struct BatchToolArgs { /// Optional timeout in seconds for the entire batch (default: 300s). #[serde(default)] pub timeout_secs: Option, + /// Optional timeout in seconds for individual tools (default: 60s). + /// This prevents a single tool from consuming the entire batch timeout. + #[serde(default)] + pub tool_timeout_secs: Option, } /// Result of a single tool call within the batch. @@ -158,7 +163,7 @@ impl BatchToolHandler { &self, calls: Vec, context: &ToolContext, - timeout_duration: Duration, + tool_timeout: Duration, ) -> BatchResult { let start_time = Instant::now(); let results = Arc::new(Mutex::new(Vec::with_capacity(calls.len()))); @@ -176,9 +181,9 @@ impl BatchToolHandler { async move { let call_start = Instant::now(); - // Execute with per-call timeout (use batch timeout for each call) + // Execute with per-tool timeout to prevent single tools from blocking others let execution_result = timeout( - timeout_duration, + tool_timeout, executor.execute_tool(&call.tool, call.arguments, &ctx), ) .await; @@ -202,7 +207,7 @@ impl BatchToolHandler { duration_ms, }, Ok(Err(e)) => BatchCallResult { - tool: tool_name, + tool: tool_name.clone(), index, success: false, result: None, @@ -210,11 +215,15 @@ impl BatchToolHandler { duration_ms, }, Err(_) => BatchCallResult { - tool: tool_name, + tool: tool_name.clone(), index, success: false, result: None, - error: Some(format!("Timeout after {}s", timeout_duration.as_secs())), + error: Some(format!( + "Tool '{}' timed out after {}s", + tool_name, + tool_timeout.as_secs() + )), duration_ms, }, }; @@ -328,13 +337,13 @@ impl ToolHandler for BatchToolHandler { // Validate calls self.validate_calls(&args.calls)?; - // Determine timeout - let timeout_secs = args.timeout_secs.unwrap_or(DEFAULT_BATCH_TIMEOUT_SECS); - let timeout_duration = Duration::from_secs(timeout_secs); + // Determine per-tool timeout (prevents single tool from blocking others) + let tool_timeout_secs = args.tool_timeout_secs.unwrap_or(DEFAULT_TOOL_TIMEOUT_SECS); + let tool_timeout = Duration::from_secs(tool_timeout_secs); // Execute all tools in parallel let batch_result = self - .execute_parallel(args.calls, context, timeout_duration) + .execute_parallel(args.calls, context, tool_timeout) .await; // Format output @@ -384,6 +393,12 @@ pub fn batch_tool_definition() -> ToolDefinition { "description": "Optional timeout in seconds for the entire batch execution (default: 300)", "minimum": 1, "maximum": 600 + }, + "tool_timeout_secs": { + "type": "integer", + "description": "Optional timeout in seconds for individual tool execution (default: 60). Prevents a single tool from consuming the entire batch timeout.", + "minimum": 1, + "maximum": 300 } } }), @@ -409,6 +424,7 @@ pub async fn execute_batch( }) .collect(), timeout_secs: None, + tool_timeout_secs: None, }; let arguments = serde_json::to_value(args) diff --git a/src/cortex-engine/src/tools/handlers/grep.rs b/src/cortex-engine/src/tools/handlers/grep.rs index 26d2561..ecef2d9 100644 --- a/src/cortex-engine/src/tools/handlers/grep.rs +++ b/src/cortex-engine/src/tools/handlers/grep.rs @@ -29,6 +29,7 @@ struct GrepArgs { glob_pattern: Option, #[serde(default = "default_output_mode")] output_mode: String, + #[serde(alias = "head_limit")] max_results: Option, #[serde(default)] multiline: bool, diff --git a/src/cortex-engine/src/tools/handlers/mod.rs b/src/cortex-engine/src/tools/handlers/mod.rs index ccfd955..8e7d3a1 100644 --- a/src/cortex-engine/src/tools/handlers/mod.rs +++ b/src/cortex-engine/src/tools/handlers/mod.rs @@ -23,9 +23,10 @@ mod web_search; pub use apply_patch::ApplyPatchHandler; pub use batch::{ BatchCallResult, BatchParams, BatchResult, BatchToolArgs, BatchToolCall, BatchToolExecutor, - BatchToolHandler, DEFAULT_BATCH_TIMEOUT_SECS, DISALLOWED_TOOLS, LegacyBatchToolCall, - MAX_BATCH_SIZE, batch_tool_definition, execute_batch, + BatchToolHandler, DISALLOWED_TOOLS, LegacyBatchToolCall, MAX_BATCH_SIZE, batch_tool_definition, + execute_batch, }; +pub use cortex_common::DEFAULT_BATCH_TIMEOUT_SECS; pub use create_agent::CreateAgentHandler; pub use edit_file::PatchHandler; diff --git a/src/cortex-engine/src/tools/unified_executor.rs b/src/cortex-engine/src/tools/unified_executor.rs index 985fefb..ed083a1 100644 --- a/src/cortex-engine/src/tools/unified_executor.rs +++ b/src/cortex-engine/src/tools/unified_executor.rs @@ -466,6 +466,7 @@ impl UnifiedToolExecutor { Ok(BatchToolArgs { calls, timeout_secs: arguments.get("timeout_secs").and_then(|v| v.as_u64()), + tool_timeout_secs: arguments.get("tool_timeout_secs").and_then(|v| v.as_u64()), }) } else { Err(CortexError::InvalidInput( diff --git a/src/cortex-exec/Cargo.toml b/src/cortex-exec/Cargo.toml index 2693c7c..de3b312 100644 --- a/src/cortex-exec/Cargo.toml +++ b/src/cortex-exec/Cargo.toml @@ -13,6 +13,7 @@ path = "src/lib.rs" workspace = true [dependencies] +cortex-common = { workspace = true } cortex-engine = { workspace = true } cortex-protocol = { workspace = true } tokio = { workspace = true, features = ["full"] } diff --git a/src/cortex-exec/src/runner.rs b/src/cortex-exec/src/runner.rs index e831324..5f24f1d 100644 --- a/src/cortex-exec/src/runner.rs +++ b/src/cortex-exec/src/runner.rs @@ -27,9 +27,17 @@ use cortex_protocol::ConversationId; use crate::output::{OutputFormat, OutputWriter}; /// Default timeout for the entire execution (10 minutes). +/// +/// This is the maximum duration for a multi-turn exec session. +/// See `cortex_common::http_client` module documentation for the complete +/// timeout hierarchy across Cortex services. const DEFAULT_TIMEOUT_SECS: u64 = 600; /// Default timeout for a single LLM request (2 minutes). +/// +/// Allows sufficient time for model inference while preventing indefinite hangs. +/// See `cortex_common::http_client` module documentation for the complete +/// timeout hierarchy across Cortex services. const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 120; /// Maximum retries for transient errors. diff --git a/src/cortex-mcp-client/src/transport.rs b/src/cortex-mcp-client/src/transport.rs index 22152cf..0ee141d 100644 --- a/src/cortex-mcp-client/src/transport.rs +++ b/src/cortex-mcp-client/src/transport.rs @@ -20,8 +20,7 @@ use cortex_mcp_types::{ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, Command}; use tokio::sync::{Mutex, RwLock}; -use tokio::time::sleep; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; // ============================================================================ // Transport Trait @@ -199,61 +198,6 @@ impl StdioTransport { Ok(()) } - /// Reconnect with exponential backoff. - /// - /// Properly cleans up existing connections before each attempt to prevent - /// file descriptor leaks (#2198). - #[allow(dead_code)] - async fn reconnect(&self) -> Result<()> { - if !self.reconnect_config.enabled { - return Err(anyhow!("Reconnection disabled")); - } - - let mut attempt = 0; - let mut delay = self.reconnect_config.initial_delay; - - while attempt < self.reconnect_config.max_attempts { - attempt += 1; - info!( - attempt, - max = self.reconnect_config.max_attempts, - "Attempting reconnection" - ); - - // Clean up any existing connection before attempting reconnect - // This prevents file descriptor leaks on repeated failures (#2198) - { - let mut process_guard = self.process.lock().await; - if let Some(mut child) = process_guard.take() { - // Kill the process and wait for it to clean up - let _ = child.kill().await; - // Wait a short time for resources to be released - drop(child); - } - self.connected.store(false, Ordering::SeqCst); - } - - // Clear any stale pending responses - self.pending_responses.write().await.clear(); - - match self.connect().await { - Ok(()) => { - info!("Reconnection successful"); - return Ok(()); - } - Err(e) => { - error!(error = %e, attempt, "Reconnection failed"); - if attempt < self.reconnect_config.max_attempts { - sleep(delay).await; - delay = (delay * 2).min(self.reconnect_config.max_delay); - } - } - } - } - - Err(anyhow!("Failed to reconnect after {} attempts", attempt)) - } - /// Send a request and wait for response. async fn send_request(&self, request: JsonRpcRequest) -> Result { // Ensure connected @@ -516,51 +460,6 @@ impl HttpTransport { fn next_request_id(&self) -> RequestId { RequestId::Number(self.request_id.fetch_add(1, Ordering::SeqCst) as i64) } - - /// Test connection. - #[allow(dead_code)] - async fn test_connection(&self) -> Result<()> { - let request = JsonRpcRequest::new(self.next_request_id(), methods::PING); - self.send_request(request).await?; - Ok(()) - } - - /// Reconnect with exponential backoff. - #[allow(dead_code)] - async fn reconnect(&self) -> Result<()> { - if !self.reconnect_config.enabled { - return Err(anyhow!("Reconnection disabled")); - } - - let mut attempt = 0; - let mut delay = self.reconnect_config.initial_delay; - - while attempt < self.reconnect_config.max_attempts { - attempt += 1; - info!( - attempt, - max = self.reconnect_config.max_attempts, - "Attempting HTTP reconnection" - ); - - match self.test_connection().await { - Ok(()) => { - info!("HTTP reconnection successful"); - self.connected.store(true, Ordering::SeqCst); - return Ok(()); - } - Err(e) => { - error!(error = %e, attempt, "HTTP reconnection failed"); - if attempt < self.reconnect_config.max_attempts { - sleep(delay).await; - delay = (delay * 2).min(self.reconnect_config.max_delay); - } - } - } - } - - Err(anyhow!("Failed to reconnect after {} attempts", attempt)) - } } #[async_trait] diff --git a/src/cortex-tui/src/runner/event_loop/streaming.rs b/src/cortex-tui/src/runner/event_loop/streaming.rs index 159d5d9..1539687 100644 --- a/src/cortex-tui/src/runner/event_loop/streaming.rs +++ b/src/cortex-tui/src/runner/event_loop/streaming.rs @@ -20,6 +20,14 @@ use cortex_engine::streaming::StreamEvent; use super::core::{EventLoop, PendingToolCall, simplify_error_message}; +/// Initial connection timeout for streaming requests. +/// See cortex_common::http_client for timeout hierarchy documentation. +const STREAMING_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60); + +/// Per-chunk timeout during streaming responses. +/// See cortex_common::http_client for timeout hierarchy documentation. +const STREAMING_CHUNK_TIMEOUT: Duration = Duration::from_secs(30); + impl EventLoop { /// Handles message submission using the new provider system. /// @@ -175,11 +183,8 @@ impl EventLoop { }; // Start the completion request with timeout - let stream_result = tokio::time::timeout( - Duration::from_secs(60), // 60 second timeout for initial connection - client.complete(request), - ) - .await; + let stream_result = + tokio::time::timeout(STREAMING_CONNECTION_TIMEOUT, client.complete(request)).await; let mut stream = match stream_result { Ok(Ok(s)) => s, @@ -214,11 +219,7 @@ impl EventLoop { } // Wait for next event with timeout - let event = tokio::time::timeout( - Duration::from_secs(30), // 30 second timeout between chunks - stream.next(), - ) - .await; + let event = tokio::time::timeout(STREAMING_CHUNK_TIMEOUT, stream.next()).await; match event { Ok(Some(Ok(ResponseEvent::Delta(delta)))) => { @@ -738,7 +739,7 @@ impl EventLoop { }; let stream_result = - tokio::time::timeout(Duration::from_secs(60), client.complete(request)).await; + tokio::time::timeout(STREAMING_CONNECTION_TIMEOUT, client.complete(request)).await; let mut stream = match stream_result { Ok(Ok(s)) => s, @@ -770,7 +771,7 @@ impl EventLoop { break; } - let event = tokio::time::timeout(Duration::from_secs(30), stream.next()).await; + let event = tokio::time::timeout(STREAMING_CHUNK_TIMEOUT, stream.next()).await; match event { Ok(Some(Ok(ResponseEvent::Delta(delta)))) => { diff --git a/src/cortex-tui/src/runner/event_loop/subagent.rs b/src/cortex-tui/src/runner/event_loop/subagent.rs index 9283ed7..a0b0c35 100644 --- a/src/cortex-tui/src/runner/event_loop/subagent.rs +++ b/src/cortex-tui/src/runner/event_loop/subagent.rs @@ -2,6 +2,14 @@ use std::time::{Duration, Instant}; +/// Connection timeout for subagent streaming requests. +/// Higher than main streaming to allow for subagent initialization. +const SUBAGENT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(120); + +/// Per-event timeout during subagent responses. +/// Higher than main streaming to account for longer tool executions. +const SUBAGENT_EVENT_TIMEOUT: Duration = Duration::from_secs(60); + use crate::app::SubagentTaskDisplay; use crate::events::{SubagentEvent, ToolEvent}; use crate::session::StoredToolCall; @@ -210,7 +218,8 @@ impl EventLoop { }; let stream_result = - tokio::time::timeout(Duration::from_secs(120), client.complete(request)).await; + tokio::time::timeout(SUBAGENT_CONNECTION_TIMEOUT, client.complete(request)) + .await; let mut stream = match stream_result { Ok(Ok(s)) => s, @@ -252,7 +261,7 @@ impl EventLoop { let mut iteration_tool_calls: Vec<(String, String, serde_json::Value)> = Vec::new(); loop { - let event = tokio::time::timeout(Duration::from_secs(60), stream.next()).await; + let event = tokio::time::timeout(SUBAGENT_EVENT_TIMEOUT, stream.next()).await; match event { Ok(Some(Ok(ResponseEvent::Delta(delta)))) => { From 1c8f1c5c8928d5f19d0b56802af896f8bc688a7d Mon Sep 17 00:00:00 2001 From: echobt Date: Wed, 4 Feb 2026 18:16:09 +0000 Subject: [PATCH 2/2] fix(batch): implement timeout_secs parameter for overall batch timeout Address Greptile review feedback: The timeout_secs parameter was documented and accepted but never used. Now it properly wraps the entire parallel execution with a batch-level timeout, separate from the per-tool timeout_secs. - Add batch-level timeout wrapper around execute_parallel - Return descriptive error message when batch times out - Add test for batch timeout behavior --- src/cortex-engine/src/tools/handlers/batch.rs | 79 ++++++++++++++++++- 1 file changed, 75 insertions(+), 4 deletions(-) diff --git a/src/cortex-engine/src/tools/handlers/batch.rs b/src/cortex-engine/src/tools/handlers/batch.rs index e3d4542..2661f9b 100644 --- a/src/cortex-engine/src/tools/handlers/batch.rs +++ b/src/cortex-engine/src/tools/handlers/batch.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; +use cortex_common::DEFAULT_BATCH_TIMEOUT_SECS; use futures::future::join_all; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; @@ -337,14 +338,30 @@ impl ToolHandler for BatchToolHandler { // Validate calls self.validate_calls(&args.calls)?; + // Determine overall batch timeout (wraps around entire parallel execution) + let batch_timeout_secs = args.timeout_secs.unwrap_or(DEFAULT_BATCH_TIMEOUT_SECS); + let batch_timeout = Duration::from_secs(batch_timeout_secs); + // Determine per-tool timeout (prevents single tool from blocking others) let tool_timeout_secs = args.tool_timeout_secs.unwrap_or(DEFAULT_TOOL_TIMEOUT_SECS); let tool_timeout = Duration::from_secs(tool_timeout_secs); - // Execute all tools in parallel - let batch_result = self - .execute_parallel(args.calls, context, tool_timeout) - .await; + // Execute all tools in parallel with overall batch timeout + let batch_result = match timeout( + batch_timeout, + self.execute_parallel(args.calls, context, tool_timeout), + ) + .await + { + Ok(result) => result, + Err(_) => { + // Batch-level timeout exceeded + return Ok(ToolResult::error(format!( + "Batch execution timed out after {}s. Consider using a longer timeout_secs or reducing the number of tools.", + batch_timeout_secs + ))); + } + }; // Format output let output = self.format_result(&batch_result); @@ -668,4 +685,58 @@ mod tests { elapsed.as_millis() ); } + + #[tokio::test] + async fn test_batch_timeout() { + // Create an executor with a slow tool + struct SlowExecutor; + + #[async_trait] + impl BatchToolExecutor for SlowExecutor { + async fn execute_tool( + &self, + _name: &str, + _arguments: Value, + _context: &ToolContext, + ) -> Result { + // Sleep longer than batch timeout + tokio::time::sleep(Duration::from_secs(5)).await; + Ok(ToolResult::success("Done")) + } + + fn has_tool(&self, _name: &str) -> bool { + true + } + } + + let executor: Arc = Arc::new(SlowExecutor); + let handler = BatchToolHandler::new(executor); + let context = ToolContext::new(PathBuf::from(".")); + + // Use a very short batch timeout (1 second) to test timeout behavior + let args = json!({ + "calls": [ + {"tool": "SlowTool", "arguments": {}} + ], + "timeout_secs": 1 + }); + + let start = Instant::now(); + let result = handler.execute(args, &context).await; + let elapsed = start.elapsed(); + + assert!(result.is_ok()); + let tool_result = result.unwrap(); + + // Should timeout quickly (around 1 second) + assert!( + elapsed.as_secs() < 3, + "Batch should have timed out quickly, but took {}s", + elapsed.as_secs() + ); + + // Should have timed out + assert!(!tool_result.success); + assert!(tool_result.output.contains("timed out")); + } }