diff --git a/cortex/cli.py b/cortex/cli.py index 996aec2b..2205137b 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -8,6 +8,11 @@ from cortex.branding import VERSION, console, cx_header, cx_print, show_banner from cortex.coordinator import InstallationCoordinator, StepStatus +from cortex.installation_history import ( + InstallationHistory, + InstallationStatus, + InstallationType, +) from cortex.demo import run_demo from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType from cortex.llm.interpreter import CommandInterpreter @@ -101,10 +106,9 @@ def _clear_line(self): sys.stdout.write("\r\033[K") sys.stdout.flush() - # --- New Notification Method --- + # --- Notification Method --- def notify(self, args): """Handle notification commands""" - # Addressing CodeRabbit feedback: Handle missing subcommand gracefully if not args.notify_action: self._print_error("Please specify a subcommand (config/enable/disable/dnd/send)") return 1 @@ -127,16 +131,14 @@ def notify(self, args): elif args.notify_action == "enable": mgr.config["enabled"] = True - # Addressing CodeRabbit feedback: Ideally should use a public method instead of private _save_config, - # but keeping as is for a simple fix (or adding a save method to NotificationManager would be best). - mgr._save_config() + mgr.save_config() self._print_success("Notifications enabled") return 0 elif args.notify_action == "disable": mgr.config["enabled"] = False - mgr._save_config() - cx_print("Notifications disabled (Critical alerts will still show)", "warning") + mgr.save_config() + self._print_success("Notifications disabled (Critical alerts will still show)") return 0 elif args.notify_action == "dnd": @@ -144,7 +146,7 @@ def notify(self, args): self._print_error("Please provide start and end times (HH:MM)") return 1 - # Addressing CodeRabbit feedback: Add time format validation + # Add time format validation try: datetime.strptime(args.start, "%H:%M") datetime.strptime(args.end, "%H:%M") @@ -154,7 +156,7 @@ def notify(self, args): mgr.config["dnd_start"] = args.start mgr.config["dnd_end"] = args.end - mgr._save_config() + mgr.save_config() self._print_success(f"DND Window updated: {args.start} - {args.end}") return 0 @@ -170,6 +172,76 @@ def notify(self, args): self._print_error("Unknown notify command") return 1 + # --- Health Command --- + def health(self, args): + """Run system health checks and show recommendations. + + Args: + args: Parsed command line arguments. + + Returns: + int: 0 on success, 1 on failure. + """ + try: + from cortex.health.monitor import HealthMonitor + + self._print_status("šŸ”", "Running system health checks...") + monitor = HealthMonitor() + report = monitor.run_all() + + # --- Display Results --- + score = report["total_score"] + + # Color code the score + score_color = "green" + if score < 60: + score_color = "red" + elif score < 80: + score_color = "yellow" + + console.print() + console.print( + f"šŸ“Š [bold]System Health Score:[/bold] [{score_color}]{score}/100[/{score_color}]" + ) + console.print() + + console.print("[bold]Factors:[/bold]") + recommendations = [] + + for res in report["results"]: + status_icon = "āœ…" + if res["status"] == "WARNING": + status_icon = "āš ļø " + elif res["status"] == "CRITICAL": + status_icon = "āŒ" + + console.print( + f" {status_icon} {res['name']:<15}: {res['score']}/100 ({res['details']})" + ) + + if res["recommendation"]: + recommendations.append(res["recommendation"]) + + console.print() + + if recommendations: + console.print("[bold]Recommendations:[/bold]") + for i, rec in enumerate(recommendations, 1): + console.print(f" {i}. {rec}") + + console.print() + console.print("[dim]Run suggested commands manually to improve your score.[/dim]") + else: + self._print_success("System is in excellent health! No actions needed.") + + return 0 + except ImportError as e: + self._print_error(f"Health module not available: {e}") + return 1 + except Exception as e: + self._print_error(f"Health check failed: {e}") + return 1 + # ------------------------------- def demo(self): """ @@ -462,7 +534,7 @@ def parallel_log_callback(message: str, level: str = "info"): coordinator = InstallationCoordinator( commands=commands, - descriptions=[f"Step {i+1}" for i in range(len(commands))], + descriptions=[f"Step {i + 1}" for i in range(len(commands))], timeout=300, stop_on_error=True, progress_callback=progress_callback, @@ -591,7 +663,7 @@ def history(self, limit: int = 20, status: str | None = None, show_id: str | Non date = r.timestamp[:19].replace("T", " ") packages = ", ".join(r.packages[:2]) if len(r.packages) > 2: - packages += f" +{len(r.packages)-2}" + packages += f" +{len(r.packages) - 2}" print( f"{r.id:<18} {date:<20} {r.operation_type.value:<12} {packages:<30} {r.status.value:<15}" @@ -767,9 +839,10 @@ def show_rich_help(): table.add_row("install ", "Install software") table.add_row("history", "View history") table.add_row("rollback ", "Undo installation") - table.add_row("notify", "Manage desktop notifications") # Added this line + table.add_row("notify", "Manage desktop notifications") table.add_row("cache stats", "Show LLM cache statistics") table.add_row("stack ", "Install the stack") + table.add_row("health", "Check system health score") table.add_row("doctor", "System health check") console.print(table) @@ -858,7 +931,7 @@ def main(): edit_pref_parser.add_argument("key", nargs="?") edit_pref_parser.add_argument("value", nargs="?") - # --- New Notify Command --- + # --- Notify Command --- notify_parser = subparsers.add_parser("notify", help="Manage desktop notifications") notify_subs = notify_parser.add_subparsers(dest="notify_action", help="Notify actions") @@ -875,6 +948,9 @@ def main(): send_parser.add_argument("--title", default="Cortex Notification") send_parser.add_argument("--level", choices=["low", "normal", "critical"], default="normal") send_parser.add_argument("--actions", nargs="*", help="Action buttons") + + # --- Health Command --- + subparsers.add_parser("health", help="Check system health score") # -------------------------- # Stack command @@ -924,9 +1000,10 @@ def main(): return cli.check_pref(key=args.key) elif args.command == "edit-pref": return cli.edit_pref(action=args.action, key=args.key, value=args.value) - # Handle the new notify command elif args.command == "notify": return cli.notify(args) + elif args.command == "health": + return cli.health(args) elif args.command == "stack": return cli.stack(args) elif args.command == "doctor": diff --git a/cortex/health/__init__.py b/cortex/health/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cortex/health/checks/disk.py b/cortex/health/checks/disk.py new file mode 100644 index 00000000..a45651f0 --- /dev/null +++ b/cortex/health/checks/disk.py @@ -0,0 +1,61 @@ +import shutil + +from ..monitor import CheckResult, HealthCheck + + +class DiskCheck(HealthCheck): + """Check root filesystem disk usage.""" + + def run(self) -> CheckResult: + """Calculate disk usage percentage. + + Returns: + CheckResult based on usage thresholds. + """ + try: + # Use _ for unused variable (free space) + total, used, _ = shutil.disk_usage("/") + usage_percent = (used / total) * 100 + except Exception as e: + return CheckResult( + name="Disk Usage", + category="disk", + score=0, + status="CRITICAL", + details=f"Check failed: {e}", + recommendation="Check disk mounts and permissions", + weight=0.20, + ) + + # Explicit early returns to avoid static analysis confusion + if usage_percent > 90: + return CheckResult( + name="Disk Usage", + category="disk", + score=0, + status="CRITICAL", + details=f"{usage_percent:.1f}% used", + recommendation="Clean up disk space immediately", + weight=0.20, + ) + + if usage_percent > 80: + return CheckResult( + name="Disk Usage", + category="disk", + score=50, + status="WARNING", + details=f"{usage_percent:.1f}% used", + recommendation="Consider cleaning up disk space", + weight=0.20, + ) + + return CheckResult( + name="Disk Usage", + category="disk", + score=100, + status="OK", + details=f"{usage_percent:.1f}% used", + recommendation=None, + weight=0.20, + ) diff --git a/cortex/health/checks/performance.py b/cortex/health/checks/performance.py new file mode 100644 index 00000000..9bd555e0 --- /dev/null +++ b/cortex/health/checks/performance.py @@ -0,0 +1,72 @@ +import multiprocessing +import os + +from ..monitor import CheckResult, HealthCheck + + +class PerformanceCheck(HealthCheck): + """Check system performance metrics including CPU load and memory usage.""" + + def run(self) -> CheckResult: + """Check system load and memory usage. + + Returns: + CheckResult with performance score. + """ + score = 100 + issues = [] + rec = None + + # 1. Load Average (1min) + try: + load1, _, _ = os.getloadavg() + cores = multiprocessing.cpu_count() + # Load ratio against core count + load_ratio = load1 / cores + + if load_ratio > 1.0: + score -= 50 + issues.append(f"High Load ({load1:.2f})") + rec = "Check top processes" + except OSError: + pass # Skip on Windows etc. + + # 2. Memory Usage (Linux /proc/meminfo) + try: + with open("/proc/meminfo") as f: + meminfo = {} + for line in f: + parts = line.split(":") + if len(parts) == 2: + meminfo[parts[0].strip()] = int(parts[1].strip().split()[0]) + + if "MemTotal" in meminfo and "MemAvailable" in meminfo: + total = meminfo["MemTotal"] + avail = meminfo["MemAvailable"] + used_percent = ((total - avail) / total) * 100 + + if used_percent > 80: + penalty = int(used_percent - 80) + score -= penalty + issues.append(f"High Memory ({used_percent:.0f}%)") + except FileNotFoundError: + pass # Non-Linux systems + + # Summary of results + status = "OK" + if score < 50: + status = "CRITICAL" + elif score < 90: + status = "WARNING" + + details = ", ".join(issues) if issues else "Optimal" + + return CheckResult( + name="System Load", + category="performance", + score=max(0, score), + status=status, + details=details, + recommendation=rec, + weight=0.20, # 20% + ) diff --git a/cortex/health/checks/security.py b/cortex/health/checks/security.py new file mode 100644 index 00000000..6924eeea --- /dev/null +++ b/cortex/health/checks/security.py @@ -0,0 +1,106 @@ +import os +import subprocess + +from ..monitor import CheckResult, HealthCheck + +# Command constants (full paths for security - avoids PATH manipulation attacks) +SYSTEMCTL_CMD = "/usr/bin/systemctl" +UFW_SERVICE = "ufw" +SSH_CONFIG_PATH = "/etc/ssh/sshd_config" + + +class SecurityCheck(HealthCheck): + """Check system security posture including firewall and SSH configuration. + + This check evaluates: + - UFW firewall status (0 points if inactive) + - SSH root login configuration (-50 points if enabled) + + Attributes: + None + + Example: + >>> check = SecurityCheck() + >>> result = check.run() + >>> print(result.score) + 100 + """ + + def run(self) -> CheckResult: + """Execute security checks and return aggregated result. + + Returns: + CheckResult: Security posture score and recommendations. + """ + score = 100 + issues = [] + recommendations = [] + + # 1. Firewall (UFW) Check + ufw_active = False + try: + # Add timeout to prevent hanging (Fixes Reliability Issue) + res = subprocess.run( + [SYSTEMCTL_CMD, "is-active", UFW_SERVICE], capture_output=True, text=True, timeout=5 + ) + # Fix: Use exact match to avoid matching "inactive" which contains "active" + if res.returncode == 0 and res.stdout.strip() == "active": + ufw_active = True + except subprocess.TimeoutExpired: + pass # Command timed out, treat as inactive or unavailable + except FileNotFoundError: + pass # Environment without systemctl (e.g., Docker or non-systemd) + except OSError: + pass # Other OS-level errors + + if not ufw_active: + score = 0 # Spec: 0 points if Firewall is inactive + issues.append("Firewall Inactive") + recommendations.append("Enable UFW Firewall") + + # 2. SSH Root Login Check + self._check_ssh_root_login(issues, recommendations) + if "Root SSH Allowed" in issues: + score -= 50 + + status = "OK" + if score < 50: + status = "CRITICAL" + elif score < 100: + status = "WARNING" + + return CheckResult( + name="Security Posture", + category="security", + score=max(0, score), + status=status, + details=", ".join(issues) if issues else "Secure", + recommendation=", ".join(recommendations) if recommendations else None, + weight=0.35, + ) + + def _check_ssh_root_login(self, issues: list[str], recommendations: list[str]) -> None: + """Check if SSH root login is enabled. + + Args: + issues: List to append issue descriptions to. + recommendations: List to append recommendations to. + """ + try: + if not os.path.exists(SSH_CONFIG_PATH): + return + + with open(SSH_CONFIG_PATH) as f: + for line in f: + stripped = line.strip() + # Check for uncommented PermitRootLogin yes + if stripped.startswith("PermitRootLogin"): + parts = stripped.split() + if len(parts) >= 2 and parts[1] == "yes": + issues.append("Root SSH Allowed") + recommendations.append("Disable SSH Root Login in sshd_config") + return + except PermissionError: + pass # Cannot read config, skip check + except OSError: + pass # Other file system errors diff --git a/cortex/health/checks/updates.py b/cortex/health/checks/updates.py new file mode 100644 index 00000000..c04564e6 --- /dev/null +++ b/cortex/health/checks/updates.py @@ -0,0 +1,99 @@ +import subprocess + +from ..monitor import CheckResult, HealthCheck + +# Command constants (full paths for security) +APT_CMD = "/usr/bin/apt" + + +class UpdateCheck(HealthCheck): + """Check for pending system updates and security patches. + + This check evaluates the number of available package updates + and applies score penalties accordingly: + - Regular packages: -2 points each + - Security updates: -10 points each + + Attributes: + None + """ + + def run(self) -> CheckResult: + """Check for available updates using apt. + + Returns: + CheckResult with score based on pending updates. + """ + score = 100 + pkg_count = 0 + sec_count = 0 + + try: + # Add timeout to prevent hangs + res = subprocess.run( + [APT_CMD, "list", "--upgradable"], capture_output=True, text=True, timeout=30 + ) + lines = res.stdout.splitlines() + + # apt list output header usually takes first line + for line in lines[1:]: + if line.strip(): + if "security" in line.lower(): + sec_count += 1 + else: + pkg_count += 1 + + # Scoring + score -= pkg_count * 2 + score -= sec_count * 10 + + except subprocess.TimeoutExpired as e: + return CheckResult( + name="System Updates", + category="updates", + score=0, + status="CRITICAL", + details=f"Check timed out: {e}", + recommendation="Verify package manager configuration", + weight=0.25, + ) + except FileNotFoundError: + return CheckResult( + name="System Updates", + category="updates", + score=0, + status="CRITICAL", + details="apt command not found", + recommendation="This check requires apt package manager", + weight=0.25, + ) + except OSError as e: + return CheckResult( + name="System Updates", + category="updates", + score=0, + status="CRITICAL", + details=f"Check failed: {e}", + recommendation="Verify package manager configuration", + weight=0.25, + ) + + status = "OK" + if score < 50: + status = "CRITICAL" + elif score < 90: + status = "WARNING" + + details = f"{pkg_count} packages, {sec_count} security updates pending" + if pkg_count == 0 and sec_count == 0: + details = "System up to date" + + return CheckResult( + name="System Updates", + category="updates", + score=max(0, score), + status=status, + details=details, + recommendation="Run 'apt upgrade'" if score < 100 else None, + weight=0.25, + ) diff --git a/cortex/health/monitor.py b/cortex/health/monitor.py new file mode 100644 index 00000000..627afb2d --- /dev/null +++ b/cortex/health/monitor.py @@ -0,0 +1,132 @@ +import json +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass +from pathlib import Path + +from rich.console import Console + +console = Console() + + +@dataclass +class CheckResult: + """Data class to hold the result of each check.""" + + name: str # Item name (e.g. "Disk Space") + category: str # Category (security, updates, performance, disk) + score: int # Score 0-100 + status: str # "OK", "WARNING", "CRITICAL" + details: str # Detailed message + recommendation: str | None = None # Recommended action (if any) + weight: float = 1.0 # Weight for weighted average + + +class HealthCheck(ABC): + """Base class inherited by all health check modules.""" + + @abstractmethod + def run(self) -> CheckResult: + """Execute the check and return a result.""" + pass + + +class HealthMonitor: + """Main engine for system health monitoring. + + Manages registration of health checks, execution, score aggregation, + and history persistence. + """ + + def __init__(self) -> None: + """Initialize the health monitor and register default checks.""" + self.history_file = Path.home() / ".cortex" / "health_history.json" + self.history_file.parent.mkdir(exist_ok=True) + self.checks: list[HealthCheck] = [] + + # Register each check here + # (Import here to prevent circular references) + from .checks.disk import DiskCheck + from .checks.performance import PerformanceCheck + from .checks.security import SecurityCheck + from .checks.updates import UpdateCheck + + self.register_check(SecurityCheck()) + self.register_check(UpdateCheck()) + self.register_check(PerformanceCheck()) + self.register_check(DiskCheck()) + + def register_check(self, check: HealthCheck) -> None: + """Register a health check instance to be run as part of the monitor. + + Args: + check (HealthCheck): The check instance to register. + """ + self.checks.append(check) + + def run_all(self) -> dict: + """Run all registered checks and return an aggregated health report. + + Returns: + dict: A report containing the timestamp, total weighted score, + and a list of individual check results. + """ + results = [] + total_weighted_score = 0 + total_weight = 0 + + for check in self.checks: + try: + result = check.run() + results.append(result) + total_weighted_score += result.score * result.weight + total_weight += result.weight + except Exception as e: + console.print(f"[red]Error running check {check.__class__.__name__}: {e}[/red]") + + final_score = 0 + if total_weight > 0: + final_score = int(total_weighted_score / total_weight) + + report = { + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + "total_score": final_score, + "results": [ + { + "name": r.name, + "category": r.category, + "score": r.score, + "status": r.status, + "details": r.details, + "recommendation": r.recommendation, + } + for r in results + ], + } + + self._save_history(report) + return report + + def _save_history(self, report: dict) -> None: + """Save the current health report to the history JSON file. + + Args: + report (dict): The health report to save. + """ + history = [] + if self.history_file.exists(): + try: + with open(self.history_file) as f: + history = json.load(f) + except json.JSONDecodeError: + pass + + history.append(report) + # Keep only the last 100 records + history = history[-100:] + + try: + with open(self.history_file, "w") as f: + json.dump(history, f, indent=4) + except Exception as e: + console.print(f"[yellow]Warning: Could not save health history: {e}[/yellow]") diff --git a/scripts/verify_ubuntu_compatibility.py b/scripts/verify_ubuntu_compatibility.py new file mode 100644 index 00000000..629e2dd1 --- /dev/null +++ b/scripts/verify_ubuntu_compatibility.py @@ -0,0 +1,291 @@ +import datetime +import json +import os +import pathlib +import shutil +import subprocess +import tempfile + +# Use absolute path for history file +HISTORY_FILE = pathlib.Path.home() / ".cortex" / "security_history.json" + +# Command constants (full paths for security - avoids PATH manipulation attacks) +SYSTEMCTL_CMD = "/usr/bin/systemctl" +UFW_CMD = "/usr/sbin/ufw" +SUDO_CMD = "/usr/bin/sudo" +SSH_SERVICE = "ssh" + + +def load_history(): + """Load past execution history""" + if HISTORY_FILE.exists(): + try: + with open(HISTORY_FILE) as f: + return json.load(f) + except json.JSONDecodeError: + return [] + return [] + + +def save_history(score, status, details): + """Save execution result to history""" + HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True) + + history = load_history() + record = { + "timestamp": datetime.datetime.now().isoformat(), + "score": score, + "status": status, + "details": details, + } + history.append(record) + history = history[-10:] + + with open(HISTORY_FILE, "w") as f: + json.dump(history, f, indent=4) + + return history + + +def show_trend(history): + """Show historical trend (Trend Tracking)""" + print("\n=== šŸ“Š Historical Trend Analysis ===") + if not history: + print(" No historical data available yet.") + return + + scores = [h["score"] for h in history] + avg_score = sum(scores) / len(scores) + last_score = scores[-1] + + print(f" History Count: {len(history)} runs") + print(f" Average Score: {avg_score:.1f}") + print(f" Last Run Score: {last_score}") + + if len(scores) > 1: + prev_score = scores[-2] + diff = last_score - prev_score + if diff > 0: + print(f" Trend: šŸ“ˆ Improved by {diff} points since previous run") + elif diff < 0: + print(f" Trend: šŸ“‰ Dropped by {abs(diff)} points since previous run") + else: + print(" Trend: āž”ļø Stable") + + +def fix_firewall(): + """Enable UFW firewall (Automated Fix). + + Uses sudo -n (non-interactive) to avoid password prompts hanging the script. + """ + print("\n [Fixing] Enabling UFW Firewall...") + + if not shutil.which("ufw") and not os.path.exists(UFW_CMD): + print(" -> āš ļø UFW is not installed. Cannot enable.") + return False + + try: + subprocess.run( + [SUDO_CMD, "-n", UFW_CMD, "--force", "enable"], + check=True, + timeout=30, + capture_output=True, + text=True, + ) + print(" -> āœ… Success: Firewall enabled.") + return True + except subprocess.CalledProcessError as e: + print(f" -> āŒ Failed to enable firewall: {e.stderr or e}") + return False + except subprocess.TimeoutExpired: + print(" -> āŒ Command timed out.") + return False + + +def fix_ssh_config(config_path): + """Disable SSH root login (Automated Fix). + + Uses atomic write pattern to prevent partial file corruption. + Creates a backup before making changes. + """ + print(f"\n [Fixing] Disabling Root Login in {config_path}...") + + if not os.path.exists(config_path): + print(f" -> āš ļø Config file not found: {config_path}") + return False + + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + backup_path = f"{config_path}.bak.{timestamp}" + + try: + shutil.copy2(config_path, backup_path) + print(f" -> Backup created at: {backup_path}") + except PermissionError: + print(" -> āŒ Failed to create backup (Permission denied). Need sudo?") + return False + except OSError as e: + print(f" -> āŒ Failed to create backup: {e}") + return False + + try: + new_lines = [] + with open(config_path) as f: + lines = f.readlines() + + fixed = False + for line in lines: + if line.strip().startswith("PermitRootLogin") and "yes" in line: + new_lines.append(f"# {line.strip()} (Disabled by Auto-Fix)\n") + new_lines.append("PermitRootLogin no\n") + fixed = True + else: + new_lines.append(line) + + if fixed: + # Atomic write using temporary file + dir_path = os.path.dirname(config_path) + with tempfile.NamedTemporaryFile( + mode="w", dir=dir_path, delete=False, suffix=".tmp" + ) as tmp_file: + tmp_file.writelines(new_lines) + tmp_path = tmp_file.name + + # Atomic replace + os.replace(tmp_path, config_path) + print(" -> āœ… Success: sshd_config updated.") + + print(" -> Restarting sshd service...") + res = subprocess.run( + [SUDO_CMD, "-n", SYSTEMCTL_CMD, "restart", SSH_SERVICE], + capture_output=True, + text=True, + timeout=30, + ) + if res.returncode != 0: + print(f" -> āš ļø SSH restart failed: {res.stderr}") + print(" -> Please restart SSH service manually.") + return True + else: + print(" -> No changes needed.") + return True + + except PermissionError: + print(" -> āŒ Permission denied. Try running with sudo.") + return False + except OSError as e: + print(f" -> āŒ Error during fix: {e}") + return False + + +def _check_firewall_status(): + """Helper to check firewall status.""" + print("\n[1] Checking Firewall (UFW)...") + try: + print(f" Running: {SYSTEMCTL_CMD} is-active ufw") + res = subprocess.run( + [SYSTEMCTL_CMD, "is-active", "ufw"], capture_output=True, text=True, timeout=10 + ) + output = res.stdout.strip() + print(f" Output: '{output}'") + + if res.returncode == 0 and output == "active": + print(" -> JUDGEMENT: Firewall is ACTIVE (Score: 100)") + return True + else: + print(" -> JUDGEMENT: Firewall is INACTIVE (Score: 0)") + return False + + except FileNotFoundError: + print(" -> ERROR: 'systemctl' command not found.") + except subprocess.TimeoutExpired: + print(" -> ERROR: Command timed out.") + except OSError as e: + print(f" -> ERROR: {e}") + return False + + +def _check_ssh_status(ssh_config): + """Helper to check SSH status.""" + print("\n[2] Checking SSH Configuration...") + score_penalty = 0 + needs_fix = False + + if os.path.exists(ssh_config): + print(f" File found: {ssh_config}") + try: + with open(ssh_config) as f: + for line in f: + parts = line.split() + if len(parts) >= 2 and parts[0] == "PermitRootLogin" and parts[1] == "yes": + print(f" -> FOUND RISKY LINE: {line.strip()}") + score_penalty = 50 + needs_fix = True + break + + if not needs_fix: + print(" -> No 'PermitRootLogin yes' found (Safe)") + + except PermissionError: + print(" -> ERROR: Permission denied. Try running with 'sudo'.") + else: + print(f" -> WARNING: {ssh_config} does not exist.") + + return score_penalty, needs_fix + + +def verify_security_logic(): + print("=== Ubuntu Security Logic Verification ===") + + ufw_active = _check_firewall_status() + ssh_config = "/etc/ssh/sshd_config" + ssh_penalty, ssh_needs_fix = _check_ssh_status(ssh_config) + + # Final Report + print("\n=== Summary ===") + final_score = 100 + if not ufw_active: + final_score = 0 + final_score -= ssh_penalty + final_score = max(0, final_score) + + status = "OK" + if final_score < 50: + status = "CRITICAL" + elif final_score < 100: + status = "WARNING" + + print(f"Current Score: {final_score}") + print(f"Status: {status}") + + # History + print("\n... Saving history ...") + details = [] + ufw_needs_fix = not ufw_active + if ufw_needs_fix: + details.append("Firewall Inactive") + if ssh_needs_fix: + details.append("Root SSH Allowed") + + history = save_history(final_score, status, ", ".join(details)) + show_trend(history) + + # Automated Fixes + if ufw_needs_fix or ssh_needs_fix: + print("\n=== šŸ› ļø Automated Fixes Available ===") + print("Issues detected that can be automatically fixed.") + user_input = input("Do you want to apply fixes now? (y/n): ").strip().lower() + + if user_input == "y": + if ufw_needs_fix: + fix_firewall() + if ssh_needs_fix: + fix_ssh_config(ssh_config) + print("\nāœ… Fixes attempt complete. Please re-run script to verify.") + else: + print("Skipping fixes.") + + +if __name__ == "__main__": + if os.geteuid() != 0: + print("NOTE: This script works best with 'sudo' for fixing issues.") + verify_security_logic() diff --git a/tests/test_health_monitor.py b/tests/test_health_monitor.py new file mode 100644 index 00000000..087357bc --- /dev/null +++ b/tests/test_health_monitor.py @@ -0,0 +1,145 @@ +import unittest +from unittest.mock import MagicMock, mock_open, patch + +from cortex.health.checks.disk import DiskCheck +from cortex.health.checks.performance import PerformanceCheck +from cortex.health.checks.security import SecurityCheck +from cortex.health.checks.updates import UpdateCheck +from cortex.health.monitor import CheckResult, HealthMonitor + + +class TestDiskCheck(unittest.TestCase): + @patch("shutil.disk_usage") + def test_disk_usage_scoring(self, mock_usage): + # Case 1: Healthy (50% used) -> 100 pts + # total=100, used=50, free=50 + mock_usage.return_value = (100, 50, 50) + check = DiskCheck() + result = check.run() + self.assertEqual(result.score, 100) + self.assertEqual(result.status, "OK") + + # Case 2: Warning (85% used) -> 50 pts + mock_usage.return_value = (100, 85, 15) + result = check.run() + self.assertEqual(result.score, 50) + self.assertEqual(result.status, "WARNING") + + # Case 3: Critical (95% used) -> 0 pts + mock_usage.return_value = (100, 95, 5) + result = check.run() + self.assertEqual(result.score, 0) + self.assertEqual(result.status, "CRITICAL") + + +class TestPerformanceCheck(unittest.TestCase): + @patch("os.getloadavg") + @patch("multiprocessing.cpu_count") + def test_load_average(self, mock_cpu, mock_load): + # Case 1: Load OK (Load 2.0 / 4 Cores = 0.5 ratio) + mock_cpu.return_value = 4 + mock_load.return_value = (2.0, 2.0, 2.0) + + # Mock reading /proc/meminfo (Normal case) + mem_data = "MemTotal: 1000 kB\nMemAvailable: 500 kB\n" + with patch("builtins.open", mock_open(read_data=mem_data)): + check = PerformanceCheck() + result = check.run() + self.assertEqual(result.score, 100) # No penalty + + @patch("os.getloadavg") + @patch("multiprocessing.cpu_count") + def test_high_load_penalty(self, mock_cpu, mock_load): + # Case 2: High Load (Load 5.0 / 4 Cores = 1.25 ratio) -> -50 pts + mock_cpu.return_value = 4 + mock_load.return_value = (5.0, 5.0, 5.0) + + # Assume memory is normal + mem_data = "MemTotal: 1000 kB\nMemAvailable: 500 kB\n" + with patch("builtins.open", mock_open(read_data=mem_data)): + check = PerformanceCheck() + result = check.run() + self.assertEqual(result.score, 50) # 100 - 50 = 50 + + +class TestSecurityCheck(unittest.TestCase): + @patch("subprocess.run") + def test_ufw_status(self, mock_run): + # Case 1: UFW Inactive -> 0 pts + mock_run.return_value.stdout = "inactive" + mock_run.return_value.returncode = 0 + + check = SecurityCheck() + result = check.run() + self.assertEqual(result.score, 0) + self.assertIn("Firewall Inactive", result.details) + + @patch("subprocess.run") + def test_ufw_active(self, mock_run): + # Case 2: UFW Active -> 100 pts (SSH config is safe by default mock) + mock_run.return_value.stdout = "active" + mock_run.return_value.returncode = 0 + + # Test error handling when sshd_config does not exist + with patch("os.path.exists", return_value=False): + check = SecurityCheck() + result = check.run() + self.assertEqual(result.score, 100) + + +class TestUpdateCheck(unittest.TestCase): + @patch("subprocess.run") + def test_apt_updates(self, mock_run): + # Mock output for apt list --upgradable + # Ignore first line, packages start from 2nd line + apt_output = """Listing... Done +package1/stable 1.0.0 amd64 [upgradable from: 0.9.9] +package2/stable 2.0.0 amd64 [upgradable from: 1.9.9] +security-pkg/stable 1.0.1 amd64 [upgradable from: 1.0.0] - Security Update +""" + mock_run.return_value.stdout = apt_output + mock_run.return_value.returncode = 0 + + check = UpdateCheck() + result = check.run() + + # Calculation: + # Total packages: 3 (but security counted separately) + # Regular packages: 2 (pkg_count) + # Security packages: 1 (sec_count) + # Penalty: (2 * 2) + (1 * 10) = 4 + 10 = 14 pts + # Expected score: 100 - 14 = 86 pts + + self.assertEqual(result.score, 86) + self.assertIn("2 packages", result.details) + + +class TestHealthMonitor(unittest.TestCase): + def test_monitor_aggregation(self): + monitor = HealthMonitor() + # Register mock checks instead of real check classes + + mock_check1 = MagicMock() + mock_check1.run.return_value = CheckResult( + name="Check1", category="test", score=100, status="OK", details="", weight=0.5 + ) + + mock_check2 = MagicMock() + mock_check2.run.return_value = CheckResult( + name="Check2", category="test", score=0, status="CRITICAL", details="", weight=0.5 + ) + + monitor.checks = [mock_check1, mock_check2] + + # Mock history saving to prevent file write + with patch.object(monitor, "_save_history"): + report = monitor.run_all() + + # Weighted average calculation: + # (100 * 0.5) + (0 * 0.5) = 50 / (0.5 + 0.5) = 50 pts + self.assertEqual(report["total_score"], 50) + self.assertEqual(len(report["results"]), 2) + + +if __name__ == "__main__": + unittest.main()