Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cortex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,7 @@ def parallel_log_callback(message: str, level: str = "info"):
timeout=300,
stop_on_error=True,
progress_callback=progress_callback,
max_retries=5,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use magic numbers add a constant instead something like = DEFAULT_MAX_RETRIES = 5

)

result = coordinator.execute()
Expand Down
22 changes: 20 additions & 2 deletions cortex/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from enum import Enum
from typing import Any

from cortex.utils.retry import SmartRetry
from cortex.validators import DANGEROUS_PATTERNS

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,13 +61,15 @@ def __init__(
enable_rollback: bool = False,
log_file: str | None = None,
progress_callback: Callable[[int, int, InstallationStep], None] | None = None,
max_retries: int = 5,
):
"""Initialize an installation run with optional logging and rollback."""
self.timeout = timeout
self.stop_on_error = stop_on_error
self.enable_rollback = enable_rollback
self.log_file = log_file
self.progress_callback = progress_callback
self.max_retries = max_retries

if descriptions and len(descriptions) != len(commands):
raise ValueError("Number of descriptions must match number of commands")
Expand All @@ -90,6 +93,7 @@ def from_plan(
enable_rollback: bool | None = None,
log_file: str | None = None,
progress_callback: Callable[[int, int, InstallationStep], None] | None = None,
max_retries: int = 5,
) -> "InstallationCoordinator":
"""Create a coordinator from a structured plan produced by an LLM.

Expand Down Expand Up @@ -124,6 +128,7 @@ def from_plan(
),
log_file=log_file,
progress_callback=progress_callback,
max_retries=max_retries,
)

for rollback_cmd in rollback_commands:
Expand Down Expand Up @@ -174,14 +179,27 @@ def _execute_command(self, step: InstallationStep) -> bool:
self._log(f"Command blocked: {step.command} - {error}")
return False

try:
def run_cmd() -> subprocess.CompletedProcess[str]:
# Use shell=True carefully - commands are validated first
# For complex shell commands (pipes, redirects), shell=True is needed
# Simple commands could use shlex.split() with shell=False
result = subprocess.run(
return subprocess.run(
step.command, shell=True, capture_output=True, text=True, timeout=self.timeout
)

def status_callback(msg: str) -> None:
self._log(msg)
# Also print to stdout so the user sees the retry happening
print(msg)
Comment on lines +190 to +193
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If progress_callback is also set, user may see duplicate output. Consider checking if a callback exists before printing.


retry_handler = SmartRetry(
max_retries=self.max_retries,
status_callback=status_callback,
)

try:
result = retry_handler.run(run_cmd)

step.return_code = result.returncode
step.output = result.stdout
step.error = result.stderr
Expand Down
126 changes: 126 additions & 0 deletions cortex/utils/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import logging
import time
from collections.abc import Callable
from typing import Any

from cortex.error_parser import ErrorCategory, ErrorParser
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ErrorParser class is unused, remove that.


logger = logging.getLogger(__name__)


class SmartRetry:
"""
Implements smart retry logic with exponential backoff.
Uses ErrorParser to distinguish between transient and permanent errors.
"""

def __init__(
self,
max_retries: int = 5,
backoff_factor: float = 1.0,
status_callback: Callable[[str], None] | None = None,
):
if not isinstance(max_retries, int) or max_retries < 0:
raise ValueError("max_retries must be a non-negative integer")
if not isinstance(backoff_factor, (int, float)) or backoff_factor < 0:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should reject backoff_factor=0 as well since it defeats the purpose of backoff.

raise ValueError("backoff_factor must be a non-negative number")

self.max_retries = max_retries
self.backoff_factor = float(backoff_factor)
self.status_callback = status_callback
self.error_parser = ErrorParser()

def run(self, func: Callable[[], Any]) -> Any:
"""
Run a function with smart retry logic.
Args:
func: The function to execute. Expected to return a result object
that has `returncode`, `stdout`, and `stderr` attributes
(like subprocess.CompletedProcess), or raise an exception.
Returns:
The result of the function call.
"""
attempt = 0
last_exception = None
last_result = None

while attempt <= self.max_retries:
try:
result = func()
last_result = result

# If result indicates success (returncode 0), return immediately
if hasattr(result, "returncode") and result.returncode == 0:
return result

# If result indicates failure, analyze it
error_msg = ""
if hasattr(result, "stderr") and result.stderr:
error_msg = result.stderr
elif hasattr(result, "stdout") and result.stdout:
error_msg = result.stdout
Comment on lines +60 to +63
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stdout may contain success output, not error info. This could cause false retry classification. So fix it.


if not self._should_retry(error_msg):
return result

except Exception as e:
last_exception = e
if not self._should_retry(str(e)):
raise

# If we are here, we need to retry (unless max retries reached)
if attempt == self.max_retries:
break

attempt += 1
sleep_time = self.backoff_factor * (2 ** (attempt - 1))

msg = f"⚠️ Transient error detected. Retrying in {sleep_time}s... (Attempt {attempt}/{self.max_retries})"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On first retry, attempt=1, so message shows "Attempt 1/5" but it's actually the 2nd execution. Consider clarifying: "Retry 1/5" instead.

logger.warning(msg)
if self.status_callback:
self.status_callback(msg)

time.sleep(sleep_time)

if last_exception:
raise last_exception
return last_result

def _should_retry(self, error_message: str) -> bool:
"""
Determine if we should retry based on the error message.
"""
if not error_message:
# If no error message, assume it's a generic failure that might be transient
return True
Comment on lines +95 to +97
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is risky. A command could fail with returncode != 0 and empty stderr/stdout for permanent reasons. Consider logging a warning here.


analysis = self.error_parser.parse_error(error_message)
category = analysis.primary_category

# If the error is explicitly marked as not fixable, fail fast
if not analysis.is_fixable:
return False

# Retry on network errors, lock errors, or unknown errors (conservative)
if category in [
ErrorCategory.NETWORK_ERROR,
ErrorCategory.LOCK_ERROR,
ErrorCategory.UNKNOWN,
]:
return True

# Fail fast on permanent errors
if category in [
ErrorCategory.PERMISSION_DENIED,
ErrorCategory.PACKAGE_NOT_FOUND,
ErrorCategory.CONFIGURATION_ERROR,
ErrorCategory.DEPENDENCY_MISSING,
ErrorCategory.CONFLICT,
ErrorCategory.DISK_SPACE,
]:
return False

# Default to retry for safety if not explicitly categorized as permanent
return True
1 change: 1 addition & 0 deletions docs/COMMANDS.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ cortex install "python3 with pip and virtualenv" --execute
- Without `--execute`, Cortex only shows the commands it would run
- The `--dry-run` flag is recommended for first-time use to verify commands
- Installation is recorded in history for potential rollback
- **Smart Retry Logic**: Cortex automatically detects transient failures (like network timeouts) and retries commands with exponential backoff (up to 5 attempts). Permanent errors (like permission denied) fail immediately.

---

Expand Down
35 changes: 18 additions & 17 deletions tests/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ def test_step_duration(self):
self.assertEqual(step.duration(), 5.5)


@patch("time.sleep")
class TestInstallationCoordinator(unittest.TestCase):
def test_initialization(self):
def test_initialization(self, mock_sleep):
commands = ["echo 1", "echo 2"]
coordinator = InstallationCoordinator(commands)

self.assertEqual(len(coordinator.steps), 2)
self.assertEqual(coordinator.steps[0].command, "echo 1")
self.assertEqual(coordinator.steps[1].command, "echo 2")

def test_from_plan_initialization(self):
def test_from_plan_initialization(self, mock_sleep):
plan = [
{"command": "echo 1", "description": "First step"},
{"command": "echo 2", "rollback": "echo rollback"},
Expand All @@ -54,23 +55,23 @@ def test_from_plan_initialization(self):
self.assertTrue(coordinator.enable_rollback)
self.assertEqual(coordinator.rollback_commands, ["echo rollback"])

def test_initialization_with_descriptions(self):
def test_initialization_with_descriptions(self, mock_sleep):
commands = ["echo 1", "echo 2"]
descriptions = ["First", "Second"]
coordinator = InstallationCoordinator(commands, descriptions)

self.assertEqual(coordinator.steps[0].description, "First")
self.assertEqual(coordinator.steps[1].description, "Second")

def test_initialization_mismatched_descriptions(self):
def test_initialization_mismatched_descriptions(self, mock_sleep):
commands = ["echo 1", "echo 2"]
descriptions = ["First"]

with self.assertRaises(ValueError):
InstallationCoordinator(commands, descriptions)

@patch("subprocess.run")
def test_execute_single_success(self, mock_run):
def test_execute_single_success(self, mock_run, mock_sleep):
mock_result = Mock()
mock_result.returncode = 0
mock_result.stdout = "success"
Expand All @@ -85,7 +86,7 @@ def test_execute_single_success(self, mock_run):
self.assertEqual(result.steps[0].status, StepStatus.SUCCESS)

@patch("subprocess.run")
def test_execute_single_failure(self, mock_run):
def test_execute_single_failure(self, mock_run, mock_sleep):
mock_result = Mock()
mock_result.returncode = 1
mock_result.stdout = ""
Expand All @@ -100,7 +101,7 @@ def test_execute_single_failure(self, mock_run):
self.assertEqual(result.steps[0].status, StepStatus.FAILED)

@patch("subprocess.run")
def test_execute_multiple_success(self, mock_run):
def test_execute_multiple_success(self, mock_run, mock_sleep):
mock_result = Mock()
mock_result.returncode = 0
mock_result.stdout = "success"
Expand All @@ -115,7 +116,7 @@ def test_execute_multiple_success(self, mock_run):
self.assertTrue(all(s.status == StepStatus.SUCCESS for s in result.steps))

@patch("subprocess.run")
def test_execute_stop_on_error(self, mock_run):
def test_execute_stop_on_error(self, mock_run, mock_sleep):
def side_effect(*args, **kwargs):
cmd = args[0] if args else kwargs.get("shell")
if "fail" in str(cmd):
Expand Down Expand Up @@ -143,7 +144,7 @@ def side_effect(*args, **kwargs):
self.assertEqual(result.steps[2].status, StepStatus.SKIPPED)

@patch("subprocess.run")
def test_execute_continue_on_error(self, mock_run):
def test_execute_continue_on_error(self, mock_run, mock_sleep):
def side_effect(*args, **kwargs):
cmd = args[0] if args else kwargs.get("shell")
if "fail" in str(cmd):
Expand All @@ -170,7 +171,7 @@ def side_effect(*args, **kwargs):
self.assertEqual(result.steps[2].status, StepStatus.SUCCESS)

@patch("subprocess.run")
def test_timeout_handling(self, mock_run):
def test_timeout_handling(self, mock_run, mock_sleep):
mock_run.side_effect = Exception("Timeout")

coordinator = InstallationCoordinator(["sleep 1000"], timeout=1)
Expand All @@ -179,7 +180,7 @@ def test_timeout_handling(self, mock_run):
self.assertFalse(result.success)
self.assertEqual(result.steps[0].status, StepStatus.FAILED)

def test_progress_callback(self):
def test_progress_callback(self, mock_sleep):
callback_calls = []

def callback(current, total, step):
Expand All @@ -199,7 +200,7 @@ def callback(current, total, step):
self.assertEqual(callback_calls[0], (1, 2, "echo 1"))
self.assertEqual(callback_calls[1], (2, 2, "echo 2"))

def test_log_file(self):
def test_log_file(self, mock_sleep):
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".log") as f:
log_file = f.name

Expand All @@ -223,7 +224,7 @@ def test_log_file(self):
os.unlink(log_file)

@patch("subprocess.run")
def test_rollback(self, mock_run):
def test_rollback(self, mock_run, mock_sleep):
mock_result = Mock()
mock_result.returncode = 1
mock_result.stdout = ""
Expand All @@ -238,7 +239,7 @@ def test_rollback(self, mock_run):
self.assertGreaterEqual(mock_run.call_count, 2)

@patch("subprocess.run")
def test_verify_installation(self, mock_run):
def test_verify_installation(self, mock_run, mock_sleep):
mock_result = Mock()
mock_result.returncode = 0
mock_result.stdout = "Docker version 20.10.0"
Expand All @@ -252,7 +253,7 @@ def test_verify_installation(self, mock_run):

self.assertTrue(verify_results["docker --version"])

def test_get_summary(self):
def test_get_summary(self, mock_sleep):
with patch("subprocess.run") as mock_run:
mock_result = Mock()
mock_result.returncode = 0
Expand All @@ -270,7 +271,7 @@ def test_get_summary(self):
self.assertEqual(summary["failed"], 0)
self.assertEqual(summary["skipped"], 0)

def test_export_log(self):
def test_export_log(self, mock_sleep):
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
export_file = f.name

Expand Down Expand Up @@ -299,7 +300,7 @@ def test_export_log(self):
os.unlink(export_file)

@patch("subprocess.run")
def test_step_timing(self, mock_run):
def test_step_timing(self, mock_run, mock_sleep):
mock_result = Mock()
mock_result.returncode = 0
mock_result.stdout = "success"
Expand Down
Loading