Skip to content
97 changes: 95 additions & 2 deletions cortex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,14 @@ def doctor(self):
doctor = SystemDoctor()
return doctor.run_checks()

def install(self, software: str, execute: bool = False, dry_run: bool = False):
def install(
self,
software: str,
execute: bool = False,
dry_run: bool = False,
parallel: bool = False,
):
# Validate input first
is_valid, error = validate_install_request(software)
if not is_valid:
self._print_error(error)
Expand Down Expand Up @@ -371,6 +378,82 @@ def progress_callback(current, total, step):

print("\nExecuting commands...")

if parallel:
import asyncio

from cortex.install_parallel import run_parallel_install

def parallel_log_callback(message: str, level: str = "info"):
if level == "success":
cx_print(f" ✅ {message}", "success")
elif level == "error":
cx_print(f" ❌ {message}", "error")
else:
cx_print(f" ℹ {message}", "info")
Comment on lines +386 to +392
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add type hints to inline callback function.

The parallel_log_callback function lacks type hints. Per coding guidelines, type hints are required in Python code.

🔎 Add type hints
-                    def parallel_log_callback(message: str, level: str = "info"):
+                    def parallel_log_callback(message: str, level: str = "info") -> None:
                         if level == "success":
                             cx_print(f"  ✅ {message}", "success")
                         elif level == "error":
                             cx_print(f"  ❌ {message}", "error")
                         else:
                             cx_print(f"  ℹ {message}", "info")

As per coding guidelines, type hints are required for all functions.

🤖 Prompt for AI Agents
In cortex/cli.py around lines 386 to 392, the inline callback function
parallel_log_callback is missing type hints; update its signature to include
parameter and return type annotations (e.g., message: str, level: str or a more
specific typing.Literal['info','success','error']) and add an explicit return
type of None; if you use Literal import it from typing (or typing_extensions for
older Python), leaving the body unchanged.


try:
success, parallel_tasks = asyncio.run(
run_parallel_install(
commands=commands,
descriptions=[f"Step {i + 1}" for i in range(len(commands))],
timeout=300,
stop_on_error=True,
log_callback=parallel_log_callback,
)
)

total_duration = 0.0
if parallel_tasks:
max_end = max(
(t.end_time for t in parallel_tasks if t.end_time is not None),
default=None,
)
min_start = min(
(t.start_time for t in parallel_tasks if t.start_time is not None),
default=None,
)
if max_end is not None and min_start is not None:
total_duration = max_end - min_start

if success:
self._print_success(f"{software} installed successfully!")
print(f"\nCompleted in {total_duration:.2f} seconds (parallel mode)")

if install_id:
history.update_installation(install_id, InstallationStatus.SUCCESS)
print(f"\n📝 Installation recorded (ID: {install_id})")
print(f" To rollback: cortex rollback {install_id}")

return 0

failed_tasks = [
t for t in parallel_tasks if getattr(t.status, "value", "") == "failed"
]
error_msg = failed_tasks[0].error if failed_tasks else "Installation failed"

if install_id:
history.update_installation(
install_id,
InstallationStatus.FAILED,
error_msg,
)

self._print_error("Installation failed")
if error_msg:
print(f" Error: {error_msg}", file=sys.stderr)
if install_id:
print(f"\n📝 Installation recorded (ID: {install_id})")
print(f" View details: cortex history show {install_id}")
return 1

except Exception as e:
if install_id:
history.update_installation(
install_id, InstallationStatus.FAILED, str(e)
)
self._print_error(f"Parallel execution failed: {str(e)}")
return 1

coordinator = InstallationCoordinator(
commands=commands,
descriptions=[f"Step {i+1}" for i in range(len(commands))],
Expand Down Expand Up @@ -751,6 +834,11 @@ def main():
install_parser.add_argument("software", type=str, help="Software to install")
install_parser.add_argument("--execute", action="store_true", help="Execute commands")
install_parser.add_argument("--dry-run", action="store_true", help="Show commands only")
install_parser.add_argument(
"--parallel",
action="store_true",
help="Enable parallel execution for multi-step installs",
)

# History command
history_parser = subparsers.add_parser("history", help="View history")
Expand Down Expand Up @@ -824,7 +912,12 @@ def main():
elif args.command == "status":
return cli.status()
elif args.command == "install":
return cli.install(args.software, execute=args.execute, dry_run=args.dry_run)
return cli.install(
args.software,
execute=args.execute,
dry_run=args.dry_run,
parallel=args.parallel,
)
elif args.command == "history":
return cli.history(limit=args.limit, status=args.status, show_id=args.show_id)
elif args.command == "rollback":
Expand Down
18 changes: 2 additions & 16 deletions cortex/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,9 @@
from enum import Enum
from typing import Any

logger = logging.getLogger(__name__)
from cortex.validators import DANGEROUS_PATTERNS

# Dangerous patterns that should never be executed
DANGEROUS_PATTERNS = [
r"rm\s+-rf\s+[/\*]",
r"rm\s+--no-preserve-root",
r"dd\s+if=.*of=/dev/",
r"curl\s+.*\|\s*sh",
r"curl\s+.*\|\s*bash",
r"wget\s+.*\|\s*sh",
r"wget\s+.*\|\s*bash",
r"\beval\s+",
r"base64\s+-d\s+.*\|",
r">\s*/etc/",
r"chmod\s+777",
r"chmod\s+\+s",
]
logger = logging.getLogger(__name__)


class StepStatus(Enum):
Expand Down
30 changes: 22 additions & 8 deletions cortex/hardware_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import json
import logging
import os
import platform
import re
import shutil
import subprocess
from dataclasses import asdict, dataclass, field
from enum import Enum
Expand Down Expand Up @@ -191,6 +193,13 @@ def __init__(self, use_cache: bool = True):
self.use_cache = use_cache
self._info: SystemInfo | None = None

def _uname(self):
"""Return uname-like info with nodename/release/machine attributes."""
uname_fn = getattr(os, "uname", None)
if callable(uname_fn):
return uname_fn()
return platform.uname()

def detect(self, force_refresh: bool = False) -> SystemInfo:
"""
Detect all hardware information.
Expand Down Expand Up @@ -299,14 +308,13 @@ def _detect_system(self, info: SystemInfo):
"""Detect basic system information."""
# Hostname
try:
info.hostname = os.uname().nodename
info.hostname = self._uname().nodename
except:
info.hostname = "unknown"

# Kernel
with contextlib.suppress(builtins.BaseException):
info.kernel_version = os.uname().release

info.kernel_version = self._uname().release
# Distro
try:
if Path("/etc/os-release").exists():
Expand All @@ -329,6 +337,7 @@ def _detect_system(self, info: SystemInfo):
def _detect_cpu(self, info: SystemInfo):
"""Detect CPU information."""
try:
uname = self._uname()
with open("/proc/cpuinfo") as f:
content = f.read()

Expand All @@ -342,7 +351,7 @@ def _detect_cpu(self, info: SystemInfo):
info.cpu.vendor = CPUVendor.INTEL
elif "AMD" in info.cpu.model:
info.cpu.vendor = CPUVendor.AMD
elif "ARM" in info.cpu.model or "aarch" in os.uname().machine:
elif "ARM" in info.cpu.model or "aarch" in uname.machine:
info.cpu.vendor = CPUVendor.ARM

# Cores (physical)
Expand All @@ -362,8 +371,7 @@ def _detect_cpu(self, info: SystemInfo):
info.cpu.frequency_mhz = float(match.group(1))

# Architecture
info.cpu.architecture = os.uname().machine

info.cpu.architecture = uname.machine
# Features
match = re.search(r"flags\s*:\s*(.+)", content)
if match:
Expand Down Expand Up @@ -611,8 +619,14 @@ def _has_nvidia_gpu(self) -> bool:
def _get_disk_free_gb(self) -> float:
"""Quick disk free space on root."""
try:
statvfs = os.statvfs("/")
return round((statvfs.f_frsize * statvfs.f_bavail) / (1024**3), 1)
statvfs_fn = getattr(os, "statvfs", None)
if callable(statvfs_fn):
statvfs = statvfs_fn("/")
return round((statvfs.f_frsize * statvfs.f_bavail) / (1024**3), 1)

root_path = os.path.abspath(os.sep)
_total, _used, free = shutil.disk_usage(root_path)
return round(free / (1024**3), 1)
except:
return 0.0

Expand Down
Loading
Loading