diff --git a/cortex/cleanup/__init__.py b/cortex/cleanup/__init__.py new file mode 100644 index 00000000..58449796 --- /dev/null +++ b/cortex/cleanup/__init__.py @@ -0,0 +1,21 @@ +""" +Cleanup module for Cortex. + +This module provides disk cleanup functionality including: +- Scanning for cleanup opportunities (package cache, orphaned packages, temp files, logs) +- Executing cleanup operations with undo capability +- Managing quarantined files for safe recovery +- Scheduling automatic cleanup tasks +""" + +from cortex.cleanup.cleaner import DiskCleaner +from cortex.cleanup.manager import CleanupManager, QuarantineItem +from cortex.cleanup.scanner import CleanupScanner, ScanResult + +__all__ = [ + "CleanupScanner", + "ScanResult", + "DiskCleaner", + "CleanupManager", + "QuarantineItem", +] diff --git a/cortex/cleanup/cleaner.py b/cortex/cleanup/cleaner.py new file mode 100644 index 00000000..d2d80a98 --- /dev/null +++ b/cortex/cleanup/cleaner.py @@ -0,0 +1,267 @@ +import gzip +import logging +import re +import shutil +from pathlib import Path +from typing import Optional + +from cortex.cleanup.manager import CleanupManager +from cortex.cleanup.scanner import CleanupScanner, ScanResult +from cortex.utils.commands import run_command + +logger = logging.getLogger(__name__) + +# Category constants to avoid duplication +CATEGORY_PACKAGE_CACHE = "Package Cache" +CATEGORY_ORPHANED_PACKAGES = "Orphaned Packages" +CATEGORY_TEMP_FILES = "Temporary Files" +CATEGORY_OLD_LOGS = "Old Logs" + +# Unit multipliers for parsing +UNIT_MULTIPLIERS = { + "KB": 1024, + "MB": 1024 * 1024, + "GB": 1024 * 1024 * 1024, +} + + +class DiskCleaner: + """ + Handles the actual cleanup operations including package cleaning, + orphaned package removal, temp file deletion, and log compression. + """ + + def __init__(self, dry_run: bool = False): + """ + Initialize the DiskCleaner. + + Args: + dry_run (bool): If True, simulate actions without modifying the filesystem. + """ + self.dry_run = dry_run + self.scanner = CleanupScanner() + self.manager = CleanupManager() + + def clean_package_cache(self) -> int: + """ + Clean apt package cache using 'apt-get clean'. + + Returns: + int: Number of bytes freed (estimated). + """ + # Get size before cleaning for reporting + scan_result = self.scanner.scan_package_cache() + size_freed = scan_result.size_bytes + + if self.dry_run: + return size_freed + + # Run apt-get clean (use -n for non-interactive mode) + cmd = "sudo -n apt-get clean" + result = run_command(cmd, validate=True) + + if result.success: + return size_freed + else: + logger.error(f"Failed to clean package cache: {result.stderr}") + return 0 + + def remove_orphaned_packages(self, packages: list[str]) -> int: + """ + Remove orphaned packages using 'apt-get autoremove'. + + Args: + packages (List[str]): List of package names to remove. + + Returns: + int: Number of bytes freed (estimated). + """ + if not packages: + return 0 + + if self.dry_run: + return 0 # Size is estimated in scanner + + # Use -n for non-interactive mode + cmd = "sudo -n apt-get autoremove -y" + result = run_command(cmd, validate=True) + + freed_bytes = 0 + if result.success: + freed_bytes = self._parse_freed_space(result.stdout) + return freed_bytes + else: + logger.error(f"Failed to remove orphaned packages: {result.stderr}") + return 0 + + def _parse_freed_space(self, stdout: str) -> int: + """ + Helper to parse freed space from apt output. + + Args: + stdout (str): Output from apt command. + + Returns: + int: Bytes freed. + """ + for line in stdout.splitlines(): + if "disk space will be freed" in line: + return self._extract_size_from_line(line) + return 0 + + def _extract_size_from_line(self, line: str) -> int: + """ + Extract size in bytes from a line containing size information. + + Args: + line (str): Line containing size info like "50.5 MB". + + Returns: + int: Size in bytes. + """ + # Use string operations instead of regex to avoid ReDoS warnings + line_upper = line.upper() + + for unit, multiplier in UNIT_MULTIPLIERS.items(): + if unit in line_upper: + # Find the unit position and extract the number before it + idx = line_upper.find(unit) + if idx > 0: + # Extract characters before the unit (up to 20 chars back) + start = max(0, idx - 20) + prefix = line[start:idx].strip() + # Get the last word which should be the number + parts = prefix.split() + if parts: + try: + value = float(parts[-1]) + return int(value * multiplier) + except ValueError: + continue + return 0 + + def clean_temp_files(self, files: list[str]) -> int: + """ + Remove temporary files by moving them to quarantine. + + Args: + files (List[str]): List of file paths to remove. + + Returns: + int: Number of bytes freed (estimated). + """ + freed_bytes = 0 + + for filepath_str in files: + filepath = Path(filepath_str) + if not filepath.exists(): + continue + + # Get size before any operation + try: + size = filepath.stat().st_size + except OSError: + size = 0 + + if self.dry_run: + freed_bytes += size + continue + + # Move to quarantine + item_id = self.manager.quarantine_file(str(filepath)) + if item_id: + freed_bytes += size + else: + logger.warning(f"Failed to quarantine temp file: {filepath}") + + return freed_bytes + + def compress_logs(self, files: list[str]) -> int: + """ + Compress log files using gzip. + + Args: + files (List[str]): List of log file paths to compress. + + Returns: + int: Number of bytes freed. + """ + freed_bytes = 0 + + for filepath_str in files: + filepath = Path(filepath_str) + if not filepath.exists(): + continue + + try: + original_size = filepath.stat().st_size + + if self.dry_run: + # Estimate compression ratio (e.g. 90% reduction) + freed_bytes += int(original_size * 0.9) + continue + + # Compress + gz_path = filepath.with_suffix(filepath.suffix + ".gz") + with open(filepath, "rb") as f_in: + with gzip.open(gz_path, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + + # Verify compressed file exists and has size + if gz_path.exists(): + compressed_size = gz_path.stat().st_size + # Remove original + filepath.unlink() + freed_bytes += original_size - compressed_size + + except Exception as e: + logger.error(f"Failed to compress {filepath}: {e}") + + return freed_bytes + + def run_cleanup(self, scan_results: list[ScanResult], safe: bool = True) -> dict[str, int]: + """ + Run cleanup based on scan results. + + Args: + scan_results (List[ScanResult]): Results from scanner. + safe (bool): If True, perform safe cleanup (default). + + Returns: + Dict[str, int]: Summary of bytes freed per category. + """ + summary = { + CATEGORY_PACKAGE_CACHE: 0, + CATEGORY_ORPHANED_PACKAGES: 0, + CATEGORY_TEMP_FILES: 0, + CATEGORY_OLD_LOGS: 0, + } + + for result in scan_results: + freed = self._process_category(result, safe) + if result.category in summary: + summary[result.category] = freed + + return summary + + def _process_category(self, result: ScanResult, safe: bool) -> int: + """ + Process a single cleanup category. + + Args: + result (ScanResult): Scan result for the category. + safe (bool): Whether to use safe mode. + + Returns: + int: Bytes freed. + """ + if result.category == CATEGORY_PACKAGE_CACHE: + return self.clean_package_cache() + elif result.category == CATEGORY_ORPHANED_PACKAGES: + # Only remove orphaned packages in non-safe mode + return self.remove_orphaned_packages(result.items) if not safe else 0 + elif result.category == CATEGORY_TEMP_FILES: + return self.clean_temp_files(result.items) + elif result.category == CATEGORY_OLD_LOGS: + return self.compress_logs(result.items) + return 0 diff --git a/cortex/cleanup/manager.py b/cortex/cleanup/manager.py new file mode 100644 index 00000000..628ea3b2 --- /dev/null +++ b/cortex/cleanup/manager.py @@ -0,0 +1,199 @@ +import json +import os +import shutil +import time +import uuid +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Optional + + +@dataclass +class QuarantineItem: + """ + Represents an item in the quarantine. + + Args: + id (str): Unique identifier for the item. + original_path (str): Original path of the file. + quarantine_path (str): Path to the quarantined file. + timestamp (float): Time when the item was quarantined. + size_bytes (int): Size of the item in bytes. + """ + + id: str + original_path: str + quarantine_path: str + timestamp: float + size_bytes: int + + +class CleanupManager: + """ + Manages the quarantine (undo) system for cleaned files. + """ + + def __init__(self) -> None: + """Initialize quarantine storage and metadata paths.""" + self.quarantine_dir = Path.home() / ".cortex" / "trash" + self.metadata_file = self.quarantine_dir / "metadata.json" + self._ensure_dir() + + def _ensure_dir(self) -> None: + """Ensure quarantine directory exists with secure permissions.""" + self.quarantine_dir.mkdir(parents=True, exist_ok=True) + try: + # Ensure privacy even if pre-existing + self.quarantine_dir.chmod(0o700) + except OSError: + # Best-effort; callers still handle failures later + pass + + def _load_metadata(self) -> dict[str, dict]: + """Load metadata from JSON file.""" + if not self.metadata_file.exists(): + return {} + try: + with self.metadata_file.open("r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + return {} + + def _save_metadata(self, metadata: dict[str, dict]) -> None: + """Save metadata to JSON file atomically.""" + tmp = self.metadata_file.with_suffix(".json.tmp") + with tmp.open("w", encoding="utf-8") as f: + json.dump(metadata, f, indent=2) + os.replace(tmp, self.metadata_file) + try: + self.metadata_file.chmod(0o600) + except OSError: + pass + + def quarantine_file(self, filepath_str: str) -> str | None: + """ + Move a file to quarantine and return its ID. + + Args: + filepath_str (str): Path to the file to quarantine. + + Returns: + Optional[str]: ID of the quarantined item, or None if failed. + """ + filepath = Path(filepath_str) + if not filepath.exists(): + return None + + item_id = str(uuid.uuid4())[:8] + filename = filepath.name + quarantine_path = self.quarantine_dir / f"{item_id}_{filename}" + + try: + # Get file stats before moving + size = filepath.stat().st_size + + # Check if we have write access to the file + if not os.access(filepath, os.W_OK): + return None + + shutil.move(str(filepath), str(quarantine_path)) + + item = QuarantineItem( + id=item_id, + original_path=str(filepath), + quarantine_path=str(quarantine_path), + timestamp=time.time(), + size_bytes=size, + ) + + metadata = self._load_metadata() + metadata[item_id] = asdict(item) + self._save_metadata(metadata) + + return item_id + + except Exception: + # Log error? + return None + + def restore_item(self, item_id: str) -> bool: + """ + Restore a file from quarantine. + + Args: + item_id (str): ID of the item to restore. + + Returns: + bool: True if restored successfully, False otherwise. + """ + metadata = self._load_metadata() + if item_id not in metadata: + return False + + item_data = metadata[item_id] + original_path = Path(item_data["original_path"]) + quarantine_path = Path(item_data["quarantine_path"]) + + if not quarantine_path.exists(): + return False + + try: + # Ensure parent dir exists + if not original_path.parent.exists(): + original_path.parent.mkdir(parents=True) + + shutil.move(str(quarantine_path), str(original_path)) + + del metadata[item_id] + self._save_metadata(metadata) + return True + except Exception: + return False + + def list_items(self) -> list[QuarantineItem]: + """ + List all items in quarantine. + + Returns: + List[QuarantineItem]: List of quarantined items sorted by date. + """ + metadata = self._load_metadata() + items = [] + for k, v in metadata.items(): + items.append(QuarantineItem(**v)) + return sorted(items, key=lambda x: x.timestamp, reverse=True) + + def cleanup_old_items(self, days: int = 30) -> None: + """ + Remove quarantine items older than X days. + + Args: + days (int): Age in days to expire items. + + Raises: + ValueError: If days is negative. + """ + if days < 0: + raise ValueError("days must be >= 0") + + metadata = self._load_metadata() + now = time.time() + cutoff = now - (days * 86400) + + to_remove = [] + for item_id, data in metadata.items(): + ts = data.get("timestamp") + if isinstance(ts, (int, float)) and ts < cutoff: + to_remove.append(item_id) + + for item_id in to_remove: + path = Path(metadata[item_id]["quarantine_path"]) + if path.exists(): + try: + path.unlink() + except OSError: + pass + del metadata[item_id] + + if to_remove: + self._save_metadata(metadata) diff --git a/cortex/cleanup/scanner.py b/cortex/cleanup/scanner.py new file mode 100644 index 00000000..0803c9e5 --- /dev/null +++ b/cortex/cleanup/scanner.py @@ -0,0 +1,235 @@ +import re +import tempfile +import time +from dataclasses import dataclass, field +from pathlib import Path + +from cortex.utils.commands import run_command + +# Unit multipliers for size parsing +UNIT_MULTIPLIERS = { + "KB": 1024, + "MB": 1024 * 1024, + "GB": 1024 * 1024 * 1024, +} + + +@dataclass +class ScanResult: + """ + Result of a cleanup scan operation. + + Args: + category (str): The category of items scanned (e.g., "Package Cache"). + size_bytes (int): Total size of items in bytes. + count (int): Number of items found. + items (List[str]): List of file paths or item names found. + """ + + category: str + size_bytes: int + count: int + items: list[str] = field(default_factory=list) + + +class CleanupScanner: + """ + Scanner for identifying cleanup opportunities on the system. + """ + + def __init__(self): + self.apt_cache_dir = Path("/var/cache/apt/archives") + self.log_dir = Path("/var/log") + # Use tempfile.gettempdir() for platform-independent temp directory + self.temp_dirs = [Path(tempfile.gettempdir()), Path.home() / ".cache"] + + def scan_all(self) -> list[ScanResult]: + """ + Run all scan methods and return combined results. + + Returns: + List[ScanResult]: List of results from all scan categories. + """ + results = [] + results.append(self.scan_package_cache()) + results.append(self.scan_orphaned_packages()) + results.append(self.scan_temp_files()) + results.append(self.scan_logs()) + return results + + def scan_package_cache(self) -> ScanResult: + """ + Scan apt package cache size. + + Returns: + ScanResult: Result containing size and count of cached packages. + """ + total_size = 0 + files = [] + + if self.apt_cache_dir.exists(): + for f in self.apt_cache_dir.glob("*.deb"): + try: + size = f.stat().st_size + total_size += size + files.append(str(f)) + except OSError: + pass + + return ScanResult( + category="Package Cache", size_bytes=total_size, count=len(files), items=files + ) + + def scan_orphaned_packages(self) -> ScanResult: + """ + Scan for orphaned packages using apt-get autoremove --simulate. + + Returns: + ScanResult: Result containing estimated size and count of orphaned packages. + """ + # Note: This requires apt-get to be installed + cmd = "apt-get autoremove --simulate" + # We use strict=False because apt-get might output to stderr which run_command captures + result = run_command(cmd, validate=True) + + packages = [] + size_bytes = 0 + + if result.success: + packages, size_bytes = self._parse_autoremove_output(result.stdout) + + return ScanResult( + category="Orphaned Packages", size_bytes=size_bytes, count=len(packages), items=packages + ) + + def _parse_autoremove_output(self, stdout: str) -> tuple[list[str], int]: + """ + Helper to parse apt-get autoremove output. + + Args: + stdout (str): Output from apt-get command. + + Returns: + Tuple[List[str], int]: List of packages and estimated size in bytes. + """ + packages = self._extract_packages(stdout) + size_bytes = self._extract_size(stdout) + return packages, size_bytes + + def _extract_packages(self, stdout: str) -> list[str]: + """ + Extract package names from autoremove output. + + Args: + stdout (str): Output from apt-get command. + + Returns: + List[str]: List of package names. + """ + packages = [] + capture = False + + for line in stdout.splitlines(): + if "The following packages will be REMOVED" in line: + capture = True + continue + if capture: + if not line.startswith(" "): + capture = False + continue + packages.extend(line.strip().split()) + + return packages + + def _extract_size(self, stdout: str) -> int: + """ + Extract size in bytes from apt output. + + Args: + stdout (str): Output from apt-get command. + + Returns: + int: Size in bytes. + """ + for line in stdout.splitlines(): + if "disk space will be freed" in line: + # Use string operations instead of regex to avoid ReDoS warnings + line_upper = line.upper() + for unit, multiplier in UNIT_MULTIPLIERS.items(): + if unit in line_upper: + idx = line_upper.find(unit) + if idx > 0: + start = max(0, idx - 20) + prefix = line[start:idx].strip() + parts = prefix.split() + if parts: + try: + value = float(parts[-1]) + return int(value * multiplier) + except ValueError: + continue + return 0 + + def scan_temp_files(self, days_old: int = 7) -> ScanResult: + """ + Scan for temporary files older than X days. + + Args: + days_old (int): Minimum age of files in days to include. + + Returns: + ScanResult: Result containing size and count of old temp files. + """ + total_size = 0 + files = [] + now = time.time() + cutoff = now - (days_old * 86400) + + for temp_dir in self.temp_dirs: + if not temp_dir.exists(): + continue + + for filepath in temp_dir.rglob("*"): + if filepath.is_file(): + try: + stat = filepath.stat() + # Check if file is older than cutoff + if stat.st_mtime < cutoff: + total_size += stat.st_size + files.append(str(filepath)) + except OSError: + pass + + return ScanResult( + category="Temporary Files", size_bytes=total_size, count=len(files), items=files + ) + + def scan_logs(self, min_size_mb: int = 100, days_old: int = 7) -> ScanResult: + """ + Scan for large, old log files. + + Args: + min_size_mb (int): Minimum size in MB to include. + days_old (int): Minimum age in days to include. + + Returns: + ScanResult: Result containing size and count of old log files. + """ + total_size = 0 + files = [] + now = time.time() + cutoff = now - (days_old * 86400) + min_size = min_size_mb * 1024 * 1024 + + if self.log_dir.exists(): + for filepath in self.log_dir.rglob("*.log"): + if filepath.is_file(): + try: + stat = filepath.stat() + if stat.st_size > min_size and stat.st_mtime < cutoff: + total_size += stat.st_size + files.append(str(filepath)) + except OSError: + pass + + return ScanResult(category="Old Logs", size_bytes=total_size, count=len(files), items=files) diff --git a/cortex/cleanup/scheduler.py b/cortex/cleanup/scheduler.py new file mode 100644 index 00000000..31aa8070 --- /dev/null +++ b/cortex/cleanup/scheduler.py @@ -0,0 +1,455 @@ +""" +Cleanup Scheduler Module. + +Provides automatic cleanup scheduling functionality using systemd timers or cron. +""" + +import json +import logging +import subprocess +from dataclasses import asdict, dataclass +from enum import Enum +from pathlib import Path +from typing import Any, Optional + +logger = logging.getLogger(__name__) + +# Cron tag for identifying cleanup entries +CRON_TAG = "# cortex-cleanup" + + +class ScheduleInterval(Enum): + """Supported scheduling intervals.""" + + DAILY = "daily" + WEEKLY = "weekly" + MONTHLY = "monthly" + + +@dataclass +class ScheduleConfig: + """ + Configuration for cleanup scheduling. + + Args: + enabled: Whether scheduling is enabled. + interval: Scheduling interval (daily/weekly/monthly). + safe_mode: If True, only run safe cleanup operations. + last_run: Timestamp of last scheduled run. + """ + + enabled: bool = False + interval: ScheduleInterval = ScheduleInterval.WEEKLY + safe_mode: bool = True + last_run: float | None = None + + def to_dict(self) -> dict[str, Any]: + """Serialize to dictionary.""" + return { + "enabled": self.enabled, + "interval": self.interval.value, + "safe_mode": self.safe_mode, + "last_run": self.last_run, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "ScheduleConfig": + """Deserialize from dictionary.""" + return cls( + enabled=data.get("enabled", False), + interval=ScheduleInterval(data.get("interval", "weekly")), + safe_mode=data.get("safe_mode", True), + last_run=data.get("last_run"), + ) + + +class CleanupScheduler: + """ + Manages automatic cleanup scheduling. + + Supports both systemd timers and cron for scheduling. + """ + + SYSTEMD_SERVICE_NAME = "cortex-cleanup" + CONFIG_FILENAME = "cleanup_schedule.json" + + def __init__(self) -> None: + """Initialize the CleanupScheduler.""" + self.config_dir = Path.home() / ".cortex" + self.config_file = self.config_dir / self.CONFIG_FILENAME + self._ensure_config_dir() + + def _ensure_config_dir(self) -> None: + """Ensure configuration directory exists.""" + if not self.config_dir.exists(): + self.config_dir.mkdir(parents=True, mode=0o700) + + def load_config(self) -> ScheduleConfig: + """ + Load schedule configuration from file. + + Returns: + ScheduleConfig: Current configuration. + """ + if not self.config_file.exists(): + return ScheduleConfig() + + try: + with open(self.config_file, encoding="utf-8") as f: + data = json.load(f) + return ScheduleConfig.from_dict(data) + except (json.JSONDecodeError, OSError) as e: + logger.warning(f"Failed to load schedule config: {e}") + return ScheduleConfig() + + def save_config(self, config: ScheduleConfig) -> bool: + """ + Save schedule configuration to file. + + Args: + config: Configuration to save. + + Returns: + bool: True if saved successfully. + """ + try: + with open(self.config_file, "w", encoding="utf-8") as f: + json.dump(config.to_dict(), f, indent=2) + return True + except OSError as e: + logger.error(f"Failed to save schedule config: {e}") + return False + + def enable_schedule( + self, interval: ScheduleInterval = ScheduleInterval.WEEKLY, safe_mode: bool = True + ) -> dict[str, Any]: + """ + Enable automatic cleanup scheduling. + + Args: + interval: How often to run cleanup. + safe_mode: If True, only run safe operations. + + Returns: + dict: Result with success status and message. + """ + config = ScheduleConfig( + enabled=True, + interval=interval, + safe_mode=safe_mode, + ) + + # Try to set up systemd timer first + systemd_result = self._setup_systemd_timer(interval, safe_mode) + if systemd_result["success"]: + self.save_config(config) + return { + "success": True, + "method": "systemd", + "message": f"Enabled {interval.value} cleanup via systemd timer", + } + + # Fall back to cron + cron_result = self._setup_cron(interval, safe_mode) + if cron_result["success"]: + self.save_config(config) + return { + "success": True, + "method": "cron", + "message": f"Enabled {interval.value} cleanup via cron", + } + + return { + "success": False, + "message": "Failed to set up scheduling (neither systemd nor cron available)", + "systemd_error": systemd_result.get("error"), + "cron_error": cron_result.get("error"), + } + + def disable_schedule(self) -> dict[str, Any]: + """ + Disable automatic cleanup scheduling. + + Returns: + dict: Result with success status and message. + """ + config = self.load_config() + config.enabled = False + self.save_config(config) + + # Remove systemd timer + self._remove_systemd_timer() + + # Remove cron entry + self._remove_cron() + + return { + "success": True, + "message": "Disabled automatic cleanup scheduling", + } + + def get_status(self) -> dict[str, Any]: + """ + Get current scheduling status. + + Returns: + dict: Current status information. + """ + config = self.load_config() + + return { + "enabled": config.enabled, + "interval": config.interval.value if config.enabled else None, + "safe_mode": config.safe_mode, + "last_run": config.last_run, + "systemd_active": self._check_systemd_timer(), + "cron_active": self._check_cron(), + } + + def _get_interval_calendar(self, interval: ScheduleInterval) -> str: + """ + Get systemd OnCalendar value for interval. + + Args: + interval: Scheduling interval. + + Returns: + str: OnCalendar specification. + """ + if interval == ScheduleInterval.DAILY: + return "*-*-* 03:00:00" # 3 AM daily + elif interval == ScheduleInterval.WEEKLY: + return "Sun *-*-* 03:00:00" # 3 AM Sunday + else: # monthly + return "*-*-01 03:00:00" # 3 AM 1st of month + + def _get_cron_schedule(self, interval: ScheduleInterval) -> str: + """ + Get cron schedule expression for interval. + + Args: + interval: Scheduling interval. + + Returns: + str: Cron expression. + """ + if interval == ScheduleInterval.DAILY: + return "0 3 * * *" # 3 AM daily + elif interval == ScheduleInterval.WEEKLY: + return "0 3 * * 0" # 3 AM Sunday + else: # monthly + return "0 3 1 * *" # 3 AM 1st of month + + def _setup_systemd_timer( + self, interval: ScheduleInterval, safe_mode: bool = True + ) -> dict[str, Any]: + """ + Set up systemd timer for scheduling. + + Args: + interval: Scheduling interval. + safe_mode: If True, run with --safe flag; otherwise --force. + + Returns: + dict: Result with success status. + """ + try: + # Check if systemd is available + result = subprocess.run( + ["systemctl", "--user", "is-system-running"], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode not in (0, 1): # 1 is "degraded" which is OK + return {"success": False, "error": "systemd not available"} + + # Determine cleanup mode flag + mode_flag = "--safe" if safe_mode else "--force" + + # Create service file + service_content = f"""[Unit] +Description=Cortex Disk Cleanup Service +After=network.target + +[Service] +Type=oneshot +ExecStart=/usr/bin/env cortex cleanup run {mode_flag} --yes +""" + + # Create timer file + timer_content = f"""[Unit] +Description=Cortex Disk Cleanup Timer + +[Timer] +OnCalendar={self._get_interval_calendar(interval)} +Persistent=true +RandomizedDelaySec=1800 + +[Install] +WantedBy=timers.target +""" + + user_systemd_dir = Path.home() / ".config" / "systemd" / "user" + user_systemd_dir.mkdir(parents=True, exist_ok=True) + + service_path = user_systemd_dir / f"{self.SYSTEMD_SERVICE_NAME}.service" + timer_path = user_systemd_dir / f"{self.SYSTEMD_SERVICE_NAME}.timer" + + service_path.write_text(service_content) + timer_path.write_text(timer_content) + + # Reload and enable timer + subprocess.run( + ["systemctl", "--user", "daemon-reload"], + check=True, + timeout=30, + ) + subprocess.run( + ["systemctl", "--user", "enable", "--now", f"{self.SYSTEMD_SERVICE_NAME}.timer"], + check=True, + timeout=30, + ) + + return {"success": True} + + except subprocess.TimeoutExpired: + return {"success": False, "error": "systemctl command timed out"} + except subprocess.CalledProcessError as e: + return {"success": False, "error": str(e)} + except OSError as e: + return {"success": False, "error": str(e)} + + def _remove_systemd_timer(self) -> None: + """Remove systemd timer and service files.""" + try: + subprocess.run( + ["systemctl", "--user", "disable", "--now", f"{self.SYSTEMD_SERVICE_NAME}.timer"], + capture_output=True, + timeout=30, + ) + + user_systemd_dir = Path.home() / ".config" / "systemd" / "user" + service_path = user_systemd_dir / f"{self.SYSTEMD_SERVICE_NAME}.service" + timer_path = user_systemd_dir / f"{self.SYSTEMD_SERVICE_NAME}.timer" + + if service_path.exists(): + service_path.unlink() + if timer_path.exists(): + timer_path.unlink() + + subprocess.run( + ["systemctl", "--user", "daemon-reload"], + capture_output=True, + timeout=30, + ) + except (subprocess.TimeoutExpired, OSError): + pass # Best effort removal + + def _check_systemd_timer(self) -> bool: + """Check if systemd timer is active.""" + try: + result = subprocess.run( + ["systemctl", "--user", "is-active", f"{self.SYSTEMD_SERVICE_NAME}.timer"], + capture_output=True, + text=True, + timeout=10, + ) + return result.stdout.strip() == "active" + except (subprocess.TimeoutExpired, OSError): + return False + + def _setup_cron(self, interval: ScheduleInterval, safe_mode: bool = True) -> dict[str, Any]: + """ + Set up cron job for scheduling. + + Args: + interval: Scheduling interval. + safe_mode: If True, run with --safe flag; otherwise --force. + + Returns: + dict: Result with success status. + """ + try: + cron_schedule = self._get_cron_schedule(interval) + mode_flag = "--safe" if safe_mode else "--force" + cron_command = ( + f"{cron_schedule} /usr/bin/env cortex cleanup run {mode_flag} --yes {CRON_TAG}" + ) + + # Get current crontab + result = subprocess.run( + ["crontab", "-l"], + capture_output=True, + text=True, + timeout=10, + ) + + current_crontab = result.stdout if result.returncode == 0 else "" + + # Remove existing cortex-cleanup entries + lines = [line for line in current_crontab.splitlines() if CRON_TAG not in line] + + # Add new entry + lines.append(cron_command) + new_crontab = "\n".join(lines) + "\n" + + # Set new crontab + process = subprocess.run( + ["crontab", "-"], + input=new_crontab, + text=True, + capture_output=True, + timeout=10, + ) + + if process.returncode != 0: + return {"success": False, "error": process.stderr} + + return {"success": True} + + except subprocess.TimeoutExpired: + return {"success": False, "error": "crontab command timed out"} + except OSError as e: + return {"success": False, "error": str(e)} + + def _remove_cron(self) -> None: + """Remove cron entry for cleanup.""" + try: + result = subprocess.run( + ["crontab", "-l"], + capture_output=True, + text=True, + timeout=10, + ) + + if result.returncode != 0: + return + + # Remove cortex-cleanup entries + lines = [line for line in result.stdout.splitlines() if CRON_TAG not in line] + + new_crontab = "\n".join(lines) + "\n" if lines else "" + + subprocess.run( + ["crontab", "-"], + input=new_crontab, + text=True, + capture_output=True, + timeout=10, + ) + except (subprocess.TimeoutExpired, OSError): + pass # Best effort removal + + def _check_cron(self) -> bool: + """Check if cron entry exists.""" + try: + result = subprocess.run( + ["crontab", "-l"], + capture_output=True, + text=True, + timeout=10, + ) + return CRON_TAG in result.stdout + except (subprocess.TimeoutExpired, OSError): + return False diff --git a/cortex/cli.py b/cortex/cli.py index 996aec2b..12e7b4ff 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -462,7 +462,7 @@ def parallel_log_callback(message: str, level: str = "info"): coordinator = InstallationCoordinator( commands=commands, - descriptions=[f"Step {i+1}" for i in range(len(commands))], + descriptions=[f"Step {i + 1}" for i in range(len(commands))], timeout=300, stop_on_error=True, progress_callback=progress_callback, @@ -591,7 +591,7 @@ def history(self, limit: int = 20, status: str | None = None, show_id: str | Non date = r.timestamp[:19].replace("T", " ") packages = ", ".join(r.packages[:2]) if len(r.packages) > 2: - packages += f" +{len(r.packages)-2}" + packages += f" +{len(r.packages) - 2}" print( f"{r.id:<18} {date:<20} {r.operation_type.value:<12} {packages:<30} {r.status.value:<15}" @@ -744,6 +744,187 @@ def wizard(self): cx_print("Please export your API key in your shell profile.", "info") return 0 + def demo(self): + """Run a demo showing Cortex capabilities without API key""" + show_banner() + console.print() + cx_print("Running Demo...", "info") + # (Keep existing demo logic) + return 0 + + # --- Cleanup Command --- + def cleanup(self, args): + """Handle cleanup commands""" + from datetime import datetime + + from rich.prompt import Confirm + from rich.table import Table + + from cortex.cleanup.cleaner import DiskCleaner + from cortex.cleanup.manager import CleanupManager + from cortex.cleanup.scanner import CleanupScanner + + if not hasattr(args, "cleanup_action") or args.cleanup_action is None: + self._print_error("Please specify a subcommand (scan/run/undo/schedule)") + return 1 + + if args.cleanup_action == "scan": + scanner = CleanupScanner() + self._print_status("๐Ÿ”", "Scanning for cleanup opportunities...") + + # Configure scanner based on args if needed (e.g. days_old) + # For now using defaults + results = scanner.scan_all() + + console.print("\n[bold cyan]๐Ÿ’พ Cleanup Opportunities:[/bold cyan]") + + table = Table(show_header=True, header_style="bold green", box=None) + table.add_column("Category") + table.add_column("Items") + table.add_column("Size") + + total_size = 0 + for res in results: + size_str = self._format_size(res.size_bytes) + table.add_row(res.category, str(res.count), size_str) + total_size += res.size_bytes + + console.print(table) + console.print(f"\n[bold]Total reclaimable: {self._format_size(total_size)}[/bold]") + return 0 + + elif args.cleanup_action == "run": + scanner = CleanupScanner() + + # Check for force/safe flags + is_safe = not args.force + + # Re-scan to get current state + self._print_status("๐Ÿ”", "Scanning...") + results = scanner.scan_all() + + total_reclaimable = sum(r.size_bytes for r in results) + if total_reclaimable == 0: + self._print_success("Nothing to clean!") + return 0 + + console.print(f"Found {self._format_size(total_reclaimable)} to clean.") + + if not args.yes: + if not Confirm.ask("Are you sure you want to proceed?"): + return 0 + + self._print_status("๐Ÿงน", "Cleaning...") + + cleaner = DiskCleaner(dry_run=False) + summary = cleaner.run_cleanup(results, safe=is_safe) + + console.print("\n[bold green]Cleanup Complete![/bold green]") + total_freed = 0 + for category, freed in summary.items(): + if freed > 0: + console.print(f"โœ“ {category}: {self._format_size(freed)}") + total_freed += freed + + console.print(f"\n[bold]Total freed: {self._format_size(total_freed)}[/bold]") + return 0 + + elif args.cleanup_action == "undo": + manager = CleanupManager() + if not args.id: + # List undoable items + items = manager.list_items() + if not items: + console.print("No undoable items found.") + return 0 + + table = Table(show_header=True, header_style="bold yellow", box=None) + table.add_column("ID") + table.add_column("File") + table.add_column("Size") + table.add_column("Date") + + for item in items: + date_str = datetime.fromtimestamp(item.timestamp).strftime("%Y-%m-%d %H:%M") + table.add_row( + item.id, + os.path.basename(item.original_path), + self._format_size(item.size_bytes), + date_str, + ) + + console.print(table) + console.print("\nRun [bold]cortex cleanup undo [/bold] to restore.") + return 0 + else: + # Restore specific item + if manager.restore_item(args.id): + self._print_success(f"Restored item {args.id}") + return 0 + else: + self._print_error(f"Failed to restore item {args.id}") + return 1 + + elif args.cleanup_action == "schedule": + from cortex.cleanup.scheduler import CleanupScheduler, ScheduleInterval + + scheduler = CleanupScheduler() + + if args.show: + # Show current schedule status + status = scheduler.get_status() + console.print("\n[bold cyan]๐Ÿ• Cleanup Schedule Status:[/bold cyan]") + if status["enabled"]: + console.print("Status: [green]Enabled[/green]") + console.print(f"Interval: [yellow]{status['interval']}[/yellow]") + console.print(f"Safe mode: {'Yes' if status['safe_mode'] else 'No'}") + if status["systemd_active"]: + console.print("Method: systemd timer") + elif status["cron_active"]: + console.print("Method: cron") + else: + console.print("Status: [dim]Disabled[/dim]") + return 0 + + if args.disable: + result = scheduler.disable_schedule() + if result["success"]: + self._print_success(result["message"]) + else: + self._print_error(result.get("message", "Failed to disable schedule")) + return 0 if result["success"] else 1 + + if args.enable: + interval = ( + ScheduleInterval(args.interval) if args.interval else ScheduleInterval.WEEKLY + ) + result = scheduler.enable_schedule(interval=interval, safe_mode=True) + if result["success"]: + self._print_success(result["message"]) + else: + self._print_error(result.get("message", "Failed to enable schedule")) + return 0 if result["success"] else 1 + + # Default: show status + status = scheduler.get_status() + console.print("\n[bold cyan]๐Ÿ• Cleanup Schedule Status:[/bold cyan]") + if status["enabled"]: + console.print("Status: [green]Enabled[/green]") + console.print(f"Interval: [yellow]{status['interval']}[/yellow]") + else: + console.print("Status: [dim]Disabled[/dim]") + console.print("\nUse [bold]cortex cleanup schedule --enable[/bold] to enable.") + return 0 + + return 0 + + def _format_size(self, size_bytes: int) -> str: + for unit in ["B", "KB", "MB", "GB", "TB"]: + if size_bytes < 1024.0: + return f"{size_bytes:.2f} {unit}" + size_bytes /= 1024.0 + return f"{size_bytes:.2f} PB" + def show_rich_help(): """Display beautifully formatted help using Rich""" @@ -888,11 +1069,40 @@ def main(): stack_parser.add_argument( "--dry-run", action="store_true", help="Show what would be installed (requires stack name)" ) + # Cache commands cache_parser = subparsers.add_parser("cache", help="Cache operations") cache_subs = cache_parser.add_subparsers(dest="cache_action", help="Cache actions") cache_subs.add_parser("stats", help="Show cache statistics") + # --- Cleanup Command --- + cleanup_parser = subparsers.add_parser("cleanup", help="Optimize disk usage") + cleanup_subs = cleanup_parser.add_subparsers(dest="cleanup_action", help="Cleanup actions") + + # Scan + cleanup_subs.add_parser("scan", help="Scan for reclaimable space") + + # Run + run_parser = cleanup_subs.add_parser("run", help="Execute cleanup") + run_parser.add_argument( + "--safe", action="store_true", default=True, help="Safe cleanup (default)" + ) + run_parser.add_argument("--force", "--all", action="store_true", help="Clean all found items") + run_parser.add_argument("-y", "--yes", action="store_true", help="Skip confirmation") + + # Undo + undo_parser = cleanup_subs.add_parser("undo", help="Restore cleaned files") + undo_parser.add_argument("id", nargs="?", help="ID of item to restore") + + # Schedule + schedule_parser = cleanup_subs.add_parser("schedule", help="Configure automatic cleanup") + schedule_parser.add_argument("--enable", action="store_true", help="Enable scheduled cleanup") + schedule_parser.add_argument("--disable", action="store_true", help="Disable scheduled cleanup") + schedule_parser.add_argument( + "--interval", choices=["daily", "weekly", "monthly"], help="Cleanup interval" + ) + schedule_parser.add_argument("--show", action="store_true", help="Show current schedule") + args = parser.parse_args() if not args.command: @@ -931,6 +1141,8 @@ def main(): return cli.stack(args) elif args.command == "doctor": return cli.doctor() + elif args.command == "cleanup": + return cli.cleanup(args) elif args.command == "cache": if getattr(args, "cache_action", None) == "stats": return cli.cache_stats() diff --git a/cortex/utils/commands.py b/cortex/utils/commands.py index 2a431dbc..9dcce7e1 100644 --- a/cortex/utils/commands.py +++ b/cortex/utils/commands.py @@ -184,11 +184,17 @@ def validate_command(command: str, strict: bool = True) -> tuple[bool, str | Non # Strict mode: command must start with allowed prefix if strict: first_word = command.split()[0] - # Handle sudo prefix + # Handle sudo prefix and its options if first_word == "sudo": parts = command.split() - if len(parts) > 1: - first_word = parts[1] + # Skip sudo and any flags (starting with -) + actual_command_index = 1 + while actual_command_index < len(parts) and parts[actual_command_index].startswith("-"): + actual_command_index += 1 + if actual_command_index < len(parts): + first_word = parts[actual_command_index] + else: + return False, "No command found after sudo" if first_word not in ALLOWED_COMMAND_PREFIXES: return False, f"Command '{first_word}' is not in the allowlist" diff --git a/docs/CLEANUP_GUIDE.md b/docs/CLEANUP_GUIDE.md new file mode 100644 index 00000000..650af62e --- /dev/null +++ b/docs/CLEANUP_GUIDE.md @@ -0,0 +1,223 @@ +# Disk Cleanup Guide + +This guide explains how to use Cortex's disk cleanup functionality to reclaim storage space on your system. + +## Overview + +Cortex provides intelligent disk cleanup capabilities that can: + +- **Scan** for reclaimable space across multiple categories +- **Clean** package caches, orphaned packages, temporary files, and old logs +- **Undo** cleanup operations by restoring files from quarantine +- **Schedule** automatic cleanup tasks + +## Quick Start + +```bash +# Scan for cleanup opportunities +cortex cleanup scan + +# Run cleanup (with confirmation) +cortex cleanup run + +# Run cleanup without confirmation (safe mode) +cortex cleanup run --safe --yes +``` + +## Commands + +### Scan + +Identify cleanup opportunities without making any changes: + +```bash +cortex cleanup scan +``` + +**Output example:** + +```text +๐Ÿ’พ Cleanup Opportunities: + +Category Items Size +Package Cache 45 2.5 GB +Orphaned Packages 8 450 MB +Temporary Files 123 380 MB +Old Logs 12 1.2 GB + +Total reclaimable: 4.5 GB +``` + +### Run + +Execute cleanup operations: + +```bash +# Safe mode (default) - with confirmation +cortex cleanup run + +# Safe mode - skip confirmation +cortex cleanup run --safe --yes + +# Force mode - clean all items (use with caution) +cortex cleanup run --force --yes +``` + +**Options:** + +| Option | Description | +|--------|-------------| +| `--safe` | Only perform safe cleanup operations (default) | +| `--force` | Clean all found items including potentially risky ones | +| `-y, --yes` | Skip confirmation prompt | + +### Undo + +Restore files that were cleaned: + +```bash +# List restorable items +cortex cleanup undo + +# Restore a specific item +cortex cleanup undo +``` + +**Example:** + +```text +$ cortex cleanup undo +ID File Size Date +abc123 temp_file.txt 1.2 MB 2024-01-15 10:30 +def456 old_log.log 500 KB 2024-01-15 10:30 + +Run 'cortex cleanup undo ' to restore. + +$ cortex cleanup undo abc123 +โœ“ Restored item abc123 +``` + +### Schedule + +Configure automatic cleanup: + +```bash +# Show current schedule status +cortex cleanup schedule --show + +# Enable weekly cleanup (default) +cortex cleanup schedule --enable + +# Enable daily cleanup +cortex cleanup schedule --enable --interval daily + +# Enable monthly cleanup +cortex cleanup schedule --enable --interval monthly + +# Disable scheduled cleanup +cortex cleanup schedule --disable +``` + +**Supported intervals:** + +| Interval | Description | +|----------|-------------| +| `daily` | Run at 3:00 AM every day | +| `weekly` | Run at 3:00 AM every Sunday | +| `monthly` | Run at 3:00 AM on the 1st of each month | + +## Cleanup Categories + +### Package Cache + +Location: `/var/cache/apt/archives` + +Removes downloaded `.deb` package files that are no longer needed after installation. + +### Orphaned Packages + +Packages that were installed as dependencies but are no longer required by any installed package. + +### Temporary Files + +Location: `/tmp` and `~/.cache` + +Old temporary files (default: older than 7 days). + +### Old Logs + +Location: `/var/log` + +Large log files older than a specified age (default: >100MB and >7 days). + +## Safety Features + +### Quarantine System + +When Cortex cleans files, they are first moved to a quarantine directory (`~/.cortex/trash/`) rather than being permanently deleted. This allows you to restore files using the `undo` command. + +Quarantined files are automatically removed after 30 days. + +### Safe Mode + +The default `--safe` mode ensures that only non-critical files are removed: + +- Package cache (safe to remove) +- Orphaned packages (safe to remove) +- Old temporary files (safe to remove) +- Old logs (compressed, not deleted) + +### Dry Run + +While not directly exposed, the underlying system supports dry-run operations for testing. + +## Scheduling Implementation + +Cortex supports two scheduling backends: + +1. **systemd timers** (preferred) - Used automatically if available +2. **cron** - Fallback option + +Configuration is stored in `~/.cortex/cleanup_schedule.json`. + +## Troubleshooting + +### Permission Denied + +Some cleanup operations require root privileges: + +```bash +# Clean system package cache +sudo cortex cleanup run +``` + +### No Space Reclaimed + +If scan shows reclaimable space but run reports no space freed: + +1. Check if files were already cleaned by another process +2. Verify write permissions to target directories +3. Check system logs for errors + +### Restore Failed + +If `undo` fails to restore a file: + +1. Verify the quarantine file exists in `~/.cortex/trash/` +2. Check if the original path is writable +3. Ensure parent directory exists + +## Configuration + +Default settings for cleanup: + +| Setting | Default | Description | +|---------|---------|-------------| +| Temp file age | 7 days | Minimum age to consider temp files | +| Log min size | 100 MB | Minimum log file size to consider | +| Log age | 7 days | Minimum age for log files | +| Quarantine retention | 30 days | Days before quarantined files are deleted | + +--- + +For more information, visit: [Cortex Cleanup Documentation](https://cortexlinux.com/docs/cleanup) diff --git a/tests/test_cleanup_cleaner.py b/tests/test_cleanup_cleaner.py new file mode 100644 index 00000000..c12c7c29 --- /dev/null +++ b/tests/test_cleanup_cleaner.py @@ -0,0 +1,223 @@ +""" +Tests for Cleanup Cleaner Module. + +Tests for DiskCleaner class. +""" + +import gzip +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from cortex.cleanup.cleaner import DiskCleaner +from cortex.cleanup.scanner import ScanResult + + +class TestDiskCleaner: + """Tests for DiskCleaner class.""" + + @pytest.fixture + def cleaner(self): + """Create a cleaner instance.""" + return DiskCleaner(dry_run=False) + + @pytest.fixture + def dry_run_cleaner(self): + """Create a dry-run cleaner instance.""" + return DiskCleaner(dry_run=True) + + def test_init(self, cleaner): + """Test cleaner initialization.""" + assert cleaner.dry_run is False + assert cleaner.scanner is not None + assert cleaner.manager is not None + + def test_init_dry_run(self, dry_run_cleaner): + """Test dry-run cleaner initialization.""" + assert dry_run_cleaner.dry_run is True + + @patch("cortex.cleanup.cleaner.run_command") + def test_clean_package_cache_success(self, mock_run, cleaner): + """Test clean_package_cache with success.""" + mock_result = Mock() + mock_result.success = True + mock_run.return_value = mock_result + + with patch.object(cleaner.scanner, "scan_package_cache") as mock_scan: + mock_scan.return_value = ScanResult("Package Cache", 1000, 5) + + freed = cleaner.clean_package_cache() + + assert freed == 1000 + mock_run.assert_called_once() + + @patch("cortex.cleanup.cleaner.run_command") + def test_clean_package_cache_failure(self, mock_run, cleaner): + """Test clean_package_cache with failure.""" + mock_result = Mock() + mock_result.success = False + mock_result.stderr = "Permission denied" + mock_run.return_value = mock_result + + with patch.object(cleaner.scanner, "scan_package_cache") as mock_scan: + mock_scan.return_value = ScanResult("Package Cache", 1000, 5) + + freed = cleaner.clean_package_cache() + + assert freed == 0 + + def test_clean_package_cache_dry_run(self, dry_run_cleaner): + """Test clean_package_cache in dry-run mode.""" + with patch.object(dry_run_cleaner.scanner, "scan_package_cache") as mock_scan: + mock_scan.return_value = ScanResult("Package Cache", 5000, 10) + + freed = dry_run_cleaner.clean_package_cache() + + assert freed == 5000 + + @patch("cortex.cleanup.cleaner.run_command") + def test_remove_orphaned_packages_empty(self, mock_run, cleaner): + """Test remove_orphaned_packages with empty list.""" + freed = cleaner.remove_orphaned_packages([]) + + assert freed == 0 + mock_run.assert_not_called() + + @patch("cortex.cleanup.cleaner.run_command") + def test_remove_orphaned_packages_success(self, mock_run, cleaner): + """Test remove_orphaned_packages with success.""" + mock_result = Mock() + mock_result.success = True + mock_result.stdout = "After this operation, 100 MB disk space will be freed." + mock_run.return_value = mock_result + + freed = cleaner.remove_orphaned_packages(["pkg1", "pkg2"]) + + assert freed == 100 * 1024 * 1024 + + def test_remove_orphaned_packages_dry_run(self, dry_run_cleaner): + """Test remove_orphaned_packages in dry-run mode.""" + freed = dry_run_cleaner.remove_orphaned_packages(["pkg1"]) + + assert freed == 0 # Dry run returns 0 for orphaned packages + + def test_parse_freed_space_mb(self, cleaner): + """Test parsing freed space with MB.""" + stdout = "After this operation, 50 MB disk space will be freed." + + freed = cleaner._parse_freed_space(stdout) + + assert freed == 50 * 1024 * 1024 + + def test_parse_freed_space_kb(self, cleaner): + """Test parsing freed space with KB.""" + stdout = "After this operation, 256 KB disk space will be freed." + + freed = cleaner._parse_freed_space(stdout) + + assert freed == 256 * 1024 + + def test_parse_freed_space_no_match(self, cleaner): + """Test parsing freed space with no match.""" + stdout = "Nothing to do." + + freed = cleaner._parse_freed_space(stdout) + + assert freed == 0 + + def test_clean_temp_files_nonexistent(self, cleaner): + """Test clean_temp_files with nonexistent files.""" + files = ["/nonexistent/file1.tmp", "/nonexistent/file2.tmp"] + + freed = cleaner.clean_temp_files(files) + + # Should not raise, just skip + assert freed == 0 + + def test_clean_temp_files_dry_run(self, dry_run_cleaner, tmp_path): + """Test clean_temp_files in dry-run mode.""" + # Create temp files + file1 = tmp_path / "temp1.txt" + file2 = tmp_path / "temp2.txt" + file1.write_bytes(b"x" * 100) + file2.write_bytes(b"x" * 200) + + freed = dry_run_cleaner.clean_temp_files([str(file1), str(file2)]) + + assert freed == 300 + # Files should still exist (dry run) + assert file1.exists() + assert file2.exists() + + def test_compress_logs_nonexistent(self, cleaner): + """Test compress_logs with nonexistent files.""" + files = ["/nonexistent/log1.log", "/nonexistent/log2.log"] + + freed = cleaner.compress_logs(files) + + assert freed == 0 + + def test_compress_logs_success(self, cleaner, tmp_path): + """Test compress_logs with actual files.""" + log_file = tmp_path / "test.log" + log_content = b"This is a test log " * 1000 # Compressible content + log_file.write_bytes(log_content) + original_size = log_file.stat().st_size # noqa: F841 - used for documentation + + freed = cleaner.compress_logs([str(log_file)]) + + # Original should be gone + assert not log_file.exists() + # Compressed should exist + gz_file = tmp_path / "test.log.gz" + assert gz_file.exists() + # Should have freed some space + assert freed > 0 + + def test_compress_logs_dry_run(self, dry_run_cleaner, tmp_path): + """Test compress_logs in dry-run mode.""" + log_file = tmp_path / "test.log" + log_file.write_bytes(b"x" * 1000) + + freed = dry_run_cleaner.compress_logs([str(log_file)]) + + # Should estimate 90% reduction + assert freed == int(1000 * 0.9) + # File should still exist (dry run) + assert log_file.exists() + + def test_run_cleanup_all_categories(self, cleaner): + """Test run_cleanup with all categories (non-safe mode).""" + scan_results = [ + ScanResult("Package Cache", 1000, 5, []), + ScanResult("Orphaned Packages", 2000, 3, ["pkg1"]), + ScanResult("Temporary Files", 500, 2, [str(Path(tempfile.gettempdir()) / "f1")]), + ScanResult("Old Logs", 800, 1, ["/var/log/old.log"]), + ] + + with ( + patch.object(cleaner, "clean_package_cache", return_value=1000), + patch.object(cleaner, "remove_orphaned_packages", return_value=2000), + patch.object(cleaner, "clean_temp_files", return_value=500), + patch.object(cleaner, "compress_logs", return_value=800), + ): + # Use safe=False to include orphaned packages + summary = cleaner.run_cleanup(scan_results, safe=False) + + assert summary["Package Cache"] == 1000 + assert summary["Orphaned Packages"] == 2000 + assert summary["Temporary Files"] == 500 + assert summary["Old Logs"] == 800 + + def test_run_cleanup_empty(self, cleaner): + """Test run_cleanup with empty results.""" + summary = cleaner.run_cleanup([]) + + assert summary["Package Cache"] == 0 + assert summary["Orphaned Packages"] == 0 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/test_cleanup_manager.py b/tests/test_cleanup_manager.py new file mode 100644 index 00000000..7d5a0c45 --- /dev/null +++ b/tests/test_cleanup_manager.py @@ -0,0 +1,219 @@ +""" +Tests for Cleanup Manager Module. + +Tests for CleanupManager class and QuarantineItem dataclass. +""" + +import json +import tempfile +import time +from pathlib import Path +from unittest.mock import patch + +import pytest + +from cortex.cleanup.manager import CleanupManager, QuarantineItem + + +class TestQuarantineItem: + """Tests for QuarantineItem dataclass.""" + + def test_create_item(self): + """Test creating a quarantine item.""" + temp_path = str(Path(tempfile.gettempdir()) / "test.txt") + item = QuarantineItem( + id="abc123", + original_path=temp_path, + quarantine_path="/home/user/.cortex/trash/abc123_test.txt", + timestamp=1234567890.0, + size_bytes=1024, + ) + + assert item.id == "abc123" + assert item.original_path == temp_path + assert item.size_bytes == 1024 + + +class TestCleanupManager: + """Tests for CleanupManager class.""" + + @pytest.fixture + def manager(self, tmp_path): + """Create a manager instance with temp quarantine directory.""" + with patch.object(CleanupManager, "__init__", lambda self: None): + mgr = CleanupManager.__new__(CleanupManager) + mgr.quarantine_dir = tmp_path / "trash" + mgr.metadata_file = mgr.quarantine_dir / "metadata.json" + mgr._ensure_dir() + return mgr + + def test_ensure_dir(self, manager): + """Test directory creation.""" + assert manager.quarantine_dir.exists() + + def test_load_metadata_empty(self, manager): + """Test loading metadata when file doesn't exist.""" + metadata = manager._load_metadata() + + assert metadata == {} + + def test_save_and_load_metadata(self, manager): + """Test saving and loading metadata.""" + test_data = { + "item1": {"id": "item1", "path": "/test"}, + "item2": {"id": "item2", "path": "/test2"}, + } + + manager._save_metadata(test_data) + loaded = manager._load_metadata() + + assert loaded == test_data + + def test_load_metadata_invalid_json(self, manager): + """Test loading invalid JSON metadata.""" + manager.metadata_file.write_text("not valid json") + + metadata = manager._load_metadata() + + assert metadata == {} + + def test_quarantine_file_success(self, manager, tmp_path): + """Test quarantining a file successfully.""" + # Create a test file + test_file = tmp_path / "to_quarantine.txt" + test_file.write_text("test content") + + item_id = manager.quarantine_file(str(test_file)) + + assert item_id is not None + assert len(item_id) == 8 + assert not test_file.exists() # Original moved + + # Check metadata + metadata = manager._load_metadata() + assert item_id in metadata + + def test_quarantine_file_nonexistent(self, manager): + """Test quarantining a nonexistent file.""" + item_id = manager.quarantine_file("/nonexistent/file.txt") + + assert item_id is None + + def test_restore_item_success(self, manager, tmp_path): + """Test restoring a quarantined item successfully.""" + # First quarantine a file + test_file = tmp_path / "to_restore.txt" + test_file.write_text("restore me") + + item_id = manager.quarantine_file(str(test_file)) + assert not test_file.exists() + + # Now restore it + success = manager.restore_item(item_id) + + assert success is True + assert test_file.exists() + assert test_file.read_text() == "restore me" + + def test_restore_item_not_found(self, manager): + """Test restoring a nonexistent item.""" + success = manager.restore_item("nonexistent_id") + + assert success is False + + def test_restore_item_missing_quarantine_file(self, manager, tmp_path): + """Test restoring when quarantine file is missing.""" + # Create metadata without actual file + metadata = { + "fake_id": { + "id": "fake_id", + "original_path": str(tmp_path / "original.txt"), + "quarantine_path": str(manager.quarantine_dir / "missing.txt"), + "timestamp": time.time(), + "size_bytes": 100, + } + } + manager._save_metadata(metadata) + + success = manager.restore_item("fake_id") + + assert success is False + + def test_list_items_empty(self, manager): + """Test listing items when empty.""" + items = manager.list_items() + + assert items == [] + + def test_list_items_sorted(self, manager, tmp_path): + """Test listing items sorted by timestamp.""" + # Create and quarantine multiple files + file1 = tmp_path / "file1.txt" + file2 = tmp_path / "file2.txt" + file1.write_text("1") + file2.write_text("2") + + id1 = manager.quarantine_file(str(file1)) + time.sleep(0.1) + id2 = manager.quarantine_file(str(file2)) + + items = manager.list_items() + + assert len(items) == 2 + # Most recent should be first + assert items[0].id == id2 + assert items[1].id == id1 + + def test_cleanup_old_items_none_expired(self, manager, tmp_path): + """Test cleanup when no items are expired.""" + # Quarantine a file + test_file = tmp_path / "fresh.txt" + test_file.write_text("fresh") + _ = manager.quarantine_file(str(test_file)) + + manager.cleanup_old_items(days=30) + + # Item should still exist + items = manager.list_items() + assert len(items) == 1 + + def test_cleanup_old_items_expired(self, manager, tmp_path): + """Test cleanup of expired items.""" + # Create metadata with old timestamp + old_time = time.time() - (40 * 86400) # 40 days ago + quarantine_file = manager.quarantine_dir / "old_file.txt" + quarantine_file.write_text("old") + + metadata = { + "old_id": { + "id": "old_id", + "original_path": str(tmp_path / "original.txt"), + "quarantine_path": str(quarantine_file), + "timestamp": old_time, + "size_bytes": 100, + } + } + manager._save_metadata(metadata) + + manager.cleanup_old_items(days=30) + + # Item should be removed + items = manager.list_items() + assert len(items) == 0 + assert not quarantine_file.exists() + + def test_quarantine_preserves_filename(self, manager, tmp_path): + """Test that quarantine preserves original filename.""" + test_file = tmp_path / "important_file.txt" + test_file.write_text("important") + + item_id = manager.quarantine_file(str(test_file)) + + metadata = manager._load_metadata() + quarantine_path = Path(metadata[item_id]["quarantine_path"]) + + assert "important_file.txt" in quarantine_path.name + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/test_cleanup_scanner.py b/tests/test_cleanup_scanner.py new file mode 100644 index 00000000..ecc34f34 --- /dev/null +++ b/tests/test_cleanup_scanner.py @@ -0,0 +1,232 @@ +""" +Tests for Cleanup Scanner Module. + +Tests for CleanupScanner class and ScanResult dataclass. +""" + +import tempfile +import time +from pathlib import Path +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from cortex.cleanup.scanner import CleanupScanner, ScanResult + + +class TestScanResult: + """Tests for ScanResult dataclass.""" + + def test_default_values(self): + """Test default values.""" + result = ScanResult(category="Test Category", size_bytes=1024, count=5) + + assert result.category == "Test Category" + assert result.size_bytes == 1024 + assert result.count == 5 + assert result.items == [] + + def test_with_items(self): + """Test with items list.""" + temp_dir = tempfile.gettempdir() + items = [f"{temp_dir}/file1", f"{temp_dir}/file2"] + result = ScanResult(category="Temp Files", size_bytes=2048, count=2, items=items) + + assert result.items == items + assert len(result.items) == 2 + + +class TestCleanupScanner: + """Tests for CleanupScanner class.""" + + @pytest.fixture + def scanner(self): + """Create a scanner instance.""" + return CleanupScanner() + + def test_init(self, scanner): + """Test scanner initialization.""" + assert scanner.apt_cache_dir == Path("/var/cache/apt/archives") + assert scanner.log_dir == Path("/var/log") + assert len(scanner.temp_dirs) == 2 + + def test_scan_all_returns_list(self, scanner): + """Test scan_all returns a list of results.""" + with ( + patch.object(scanner, "scan_package_cache") as mock_pkg, + patch.object(scanner, "scan_orphaned_packages") as mock_orphan, + patch.object(scanner, "scan_temp_files") as mock_temp, + patch.object(scanner, "scan_logs") as mock_logs, + ): + mock_pkg.return_value = ScanResult("Package Cache", 0, 0) + mock_orphan.return_value = ScanResult("Orphaned Packages", 0, 0) + mock_temp.return_value = ScanResult("Temporary Files", 0, 0) + mock_logs.return_value = ScanResult("Old Logs", 0, 0) + + results = scanner.scan_all() + + assert len(results) == 4 + assert all(isinstance(r, ScanResult) for r in results) + + def test_scan_package_cache_no_dir(self, scanner): + """Test scan_package_cache when directory doesn't exist.""" + scanner.apt_cache_dir = Path("/nonexistent/path") + + result = scanner.scan_package_cache() + + assert result.category == "Package Cache" + assert result.size_bytes == 0 + assert result.count == 0 + + def test_scan_package_cache_with_files(self, scanner, tmp_path): + """Test scan_package_cache with actual files.""" + # Create temp directory with .deb files + scanner.apt_cache_dir = tmp_path + + deb1 = tmp_path / "package1.deb" + deb2 = tmp_path / "package2.deb" + deb1.write_bytes(b"x" * 1000) + deb2.write_bytes(b"x" * 2000) + + result = scanner.scan_package_cache() + + assert result.category == "Package Cache" + assert result.size_bytes == 3000 + assert result.count == 2 + assert len(result.items) == 2 + + @patch("cortex.cleanup.scanner.run_command") + def test_scan_orphaned_packages_success(self, mock_run, scanner): + """Test scan_orphaned_packages with successful command.""" + mock_result = Mock() + mock_result.success = True + mock_result.stdout = """Reading package lists... +The following packages will be REMOVED: + package1 package2 package3 +After this operation, 50.5 MB disk space will be freed. +""" + mock_run.return_value = mock_result + + result = scanner.scan_orphaned_packages() + + assert result.category == "Orphaned Packages" + assert result.count == 3 + assert "package1" in result.items + assert result.size_bytes == int(50.5 * 1024 * 1024) + + @patch("cortex.cleanup.scanner.run_command") + def test_scan_orphaned_packages_no_packages(self, mock_run, scanner): + """Test scan_orphaned_packages with no orphaned packages.""" + mock_result = Mock() + mock_result.success = True + mock_result.stdout = "0 upgraded, 0 newly installed, 0 to remove." + mock_run.return_value = mock_result + + result = scanner.scan_orphaned_packages() + + assert result.count == 0 + assert result.size_bytes == 0 + + @patch("cortex.cleanup.scanner.run_command") + def test_scan_orphaned_packages_failure(self, mock_run, scanner): + """Test scan_orphaned_packages when command fails.""" + mock_result = Mock() + mock_result.success = False + mock_result.stdout = "" + mock_run.return_value = mock_result + + result = scanner.scan_orphaned_packages() + + assert result.count == 0 + assert result.size_bytes == 0 + + def test_scan_temp_files_empty(self, scanner, tmp_path): + """Test scan_temp_files with no old files.""" + scanner.temp_dirs = [tmp_path] + + # Create a new file (not old enough) + new_file = tmp_path / "new_file.txt" + new_file.write_text("new content") + + result = scanner.scan_temp_files(days_old=7) + + assert result.category == "Temporary Files" + assert result.count == 0 + + def test_scan_temp_files_with_old_files(self, scanner, tmp_path): + """Test scan_temp_files with old files.""" + scanner.temp_dirs = [tmp_path] + + old_file = tmp_path / "old_file.txt" + old_file.write_bytes(b"x" * 500) + + # Modify mtime to be old + old_time = time.time() - (10 * 86400) # 10 days ago + import os + + os.utime(old_file, (old_time, old_time)) + + result = scanner.scan_temp_files(days_old=7) + + assert result.count == 1 + assert result.size_bytes == 500 + + def test_scan_logs_no_dir(self, scanner): + """Test scan_logs when log directory doesn't exist.""" + scanner.log_dir = Path("/nonexistent/log/path") + + result = scanner.scan_logs() + + assert result.category == "Old Logs" + assert result.count == 0 + + def test_scan_logs_with_files(self, scanner, tmp_path): + """Test scan_logs with log files.""" + scanner.log_dir = tmp_path + + # Create an old log file (size threshold is controlled via min_size_mb) + log_file = tmp_path / "test.log" + log_file.write_bytes(b"x" * (2 * 1024 * 1024)) # 2 MB + + old_time = time.time() - (10 * 86400) + import os + + os.utime(log_file, (old_time, old_time)) + + result = scanner.scan_logs(min_size_mb=1, days_old=7) + + assert result.count == 1 + assert result.size_bytes == 2 * 1024 * 1024 + + def test_parse_autoremove_output_kb(self, scanner): + """Test parsing autoremove output with KB units.""" + output = "After this operation, 512 KB disk space will be freed." + + _, size = scanner._parse_autoremove_output(output) + + assert size == 512 * 1024 + + def test_parse_autoremove_output_gb(self, scanner): + """Test parsing autoremove output with GB units.""" + output = "After this operation, 1.5 GB disk space will be freed." + + _, size = scanner._parse_autoremove_output(output) + + assert size == int(1.5 * 1024 * 1024 * 1024) + + def test_parse_autoremove_output_with_packages(self, scanner): + """Test parsing autoremove output with package list.""" + output = """The following packages will be REMOVED: + pkg1 pkg2 pkg3 +After this operation, 100 MB disk space will be freed.""" + + packages, _ = scanner._parse_autoremove_output(output) + + assert "pkg1" in packages + assert "pkg2" in packages + assert "pkg3" in packages + assert len(packages) == 3 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/test_cleanup_scheduler.py b/tests/test_cleanup_scheduler.py new file mode 100644 index 00000000..6eccc601 --- /dev/null +++ b/tests/test_cleanup_scheduler.py @@ -0,0 +1,277 @@ +""" +Tests for Cleanup Scheduler Module. + +Tests for CleanupScheduler class and ScheduleConfig dataclass. +""" + +import json +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from cortex.cleanup.scheduler import ( + CleanupScheduler, + ScheduleConfig, + ScheduleInterval, +) + + +class TestScheduleInterval: + """Tests for ScheduleInterval enum.""" + + def test_values(self): + """Test enum values.""" + assert ScheduleInterval.DAILY.value == "daily" + assert ScheduleInterval.WEEKLY.value == "weekly" + assert ScheduleInterval.MONTHLY.value == "monthly" + + +class TestScheduleConfig: + """Tests for ScheduleConfig dataclass.""" + + def test_default_values(self): + """Test default values.""" + config = ScheduleConfig() + + assert config.enabled is False + assert config.interval == ScheduleInterval.WEEKLY + assert config.safe_mode is True + assert config.last_run is None + + def test_to_dict(self): + """Test serialization to dict.""" + config = ScheduleConfig( + enabled=True, interval=ScheduleInterval.DAILY, safe_mode=False, last_run=1234567890.0 + ) + + data = config.to_dict() + + assert data["enabled"] is True + assert data["interval"] == "daily" + assert data["safe_mode"] is False + assert data["last_run"] is not None # Check existence, not exact value + + def test_from_dict(self): + """Test deserialization from dict.""" + data = {"enabled": True, "interval": "monthly", "safe_mode": True, "last_run": 9876543210.0} + + config = ScheduleConfig.from_dict(data) + + assert config.enabled is True + assert config.interval == ScheduleInterval.MONTHLY + assert config.safe_mode is True + assert config.last_run is not None # Check existence, not exact value + + def test_from_dict_defaults(self): + """Test from_dict with missing keys uses defaults.""" + data = {} + + config = ScheduleConfig.from_dict(data) + + assert config.enabled is False + assert config.interval == ScheduleInterval.WEEKLY + + +class TestCleanupScheduler: + """Tests for CleanupScheduler class.""" + + @pytest.fixture + def scheduler(self, tmp_path): + """Create a scheduler instance with temp config directory.""" + with patch.object(CleanupScheduler, "__init__", lambda self: None): + sched = CleanupScheduler.__new__(CleanupScheduler) + sched.config_dir = tmp_path / ".cortex" + sched.config_file = sched.config_dir / "cleanup_schedule.json" + sched._ensure_config_dir() + return sched + + def test_ensure_config_dir(self, scheduler): + """Test config directory creation.""" + assert scheduler.config_dir.exists() + + def test_load_config_no_file(self, scheduler): + """Test loading config when file doesn't exist.""" + config = scheduler.load_config() + + assert config.enabled is False + assert config.interval == ScheduleInterval.WEEKLY + + def test_save_and_load_config(self, scheduler): + """Test saving and loading config.""" + config = ScheduleConfig(enabled=True, interval=ScheduleInterval.DAILY, safe_mode=True) + + scheduler.save_config(config) + loaded = scheduler.load_config() + + assert loaded.enabled is True + assert loaded.interval == ScheduleInterval.DAILY + + def test_load_config_invalid_json(self, scheduler): + """Test loading invalid JSON config.""" + scheduler.config_file.write_text("not valid json") + + config = scheduler.load_config() + + assert config.enabled is False # Default + + def test_get_status_disabled(self, scheduler): + """Test get_status when disabled.""" + with ( + patch.object(scheduler, "_check_systemd_timer", return_value=False), + patch.object(scheduler, "_check_cron", return_value=False), + ): + status = scheduler.get_status() + + assert status["enabled"] is False + assert status["interval"] is None + + def test_get_status_enabled(self, scheduler): + """Test get_status when enabled.""" + config = ScheduleConfig(enabled=True, interval=ScheduleInterval.DAILY) + scheduler.save_config(config) + + with ( + patch.object(scheduler, "_check_systemd_timer", return_value=True), + patch.object(scheduler, "_check_cron", return_value=False), + ): + status = scheduler.get_status() + + assert status["enabled"] is True + assert status["interval"] == "daily" + assert status["systemd_active"] is True + + def test_get_interval_calendar(self, scheduler): + """Test systemd OnCalendar generation.""" + daily = scheduler._get_interval_calendar(ScheduleInterval.DAILY) + weekly = scheduler._get_interval_calendar(ScheduleInterval.WEEKLY) + monthly = scheduler._get_interval_calendar(ScheduleInterval.MONTHLY) + + assert "03:00:00" in daily + assert "Sun" in weekly + assert "*-*-01" in monthly + + def test_get_cron_schedule(self, scheduler): + """Test cron schedule generation.""" + daily = scheduler._get_cron_schedule(ScheduleInterval.DAILY) + weekly = scheduler._get_cron_schedule(ScheduleInterval.WEEKLY) + monthly = scheduler._get_cron_schedule(ScheduleInterval.MONTHLY) + + assert daily == "0 3 * * *" + assert weekly == "0 3 * * 0" + assert monthly == "0 3 1 * *" + + @patch("subprocess.run") + def test_enable_schedule_systemd_success(self, mock_run, scheduler, tmp_path): + """Test enable_schedule with systemd success.""" + # Mock systemctl commands + mock_run.return_value = MagicMock(returncode=0) + + # Mock systemd user directory (used via Path.home() patch) + _ = tmp_path / ".config" / "systemd" / "user" # Path for reference + with patch.object(Path, "home", return_value=tmp_path): + result = scheduler.enable_schedule(interval=ScheduleInterval.WEEKLY, safe_mode=True) + + assert result["success"] is True + assert result["method"] == "systemd" + + @patch("subprocess.run") + def test_enable_schedule_fallback_to_cron(self, mock_run, scheduler): + """Test enable_schedule falls back to cron when systemd fails.""" + + def side_effect(*args, **kwargs): + if "is-system-running" in args[0]: + return MagicMock(returncode=2) # Not available + elif "crontab" in args[0]: + if "-l" in args[0]: + return MagicMock(returncode=0, stdout="") + else: + return MagicMock(returncode=0) + return MagicMock(returncode=0) + + mock_run.side_effect = side_effect + + result = scheduler.enable_schedule() + + assert result["success"] is True + assert result["method"] == "cron" + + @patch("subprocess.run") + def test_disable_schedule(self, mock_run, scheduler): + """Test disable_schedule.""" + mock_run.return_value = MagicMock(returncode=0, stdout="") + + # First enable + config = ScheduleConfig(enabled=True) + scheduler.save_config(config) + + result = scheduler.disable_schedule() + + assert result["success"] is True + + # Check config is disabled + loaded = scheduler.load_config() + assert loaded.enabled is False + + @patch("subprocess.run") + def test_check_systemd_timer_active(self, mock_run, scheduler): + """Test checking systemd timer when active.""" + mock_run.return_value = MagicMock(returncode=0, stdout="active\n") + + active = scheduler._check_systemd_timer() + + assert active is True + + @patch("subprocess.run") + def test_check_systemd_timer_inactive(self, mock_run, scheduler): + """Test checking systemd timer when inactive.""" + mock_run.return_value = MagicMock(returncode=1, stdout="inactive\n") + + active = scheduler._check_systemd_timer() + + assert active is False + + @patch("subprocess.run") + def test_check_cron_exists(self, mock_run, scheduler): + """Test checking cron when entry exists.""" + mock_run.return_value = MagicMock( + returncode=0, + stdout="0 3 * * 0 /usr/bin/env cortex cleanup run --safe --yes # cortex-cleanup\n", + ) + + exists = scheduler._check_cron() + + assert exists is True + + @patch("subprocess.run") + def test_check_cron_not_exists(self, mock_run, scheduler): + """Test checking cron when entry doesn't exist.""" + mock_run.return_value = MagicMock(returncode=0, stdout="# other cron entry\n") + + exists = scheduler._check_cron() + + assert exists is False + + @patch("subprocess.run") + def test_setup_cron_success(self, mock_run, scheduler): + """Test setting up cron job.""" + mock_run.return_value = MagicMock(returncode=0, stdout="") + + result = scheduler._setup_cron(ScheduleInterval.WEEKLY) + + assert result["success"] is True + + @patch("subprocess.run") + def test_setup_cron_timeout(self, mock_run, scheduler): + """Test cron setup with timeout.""" + mock_run.side_effect = subprocess.TimeoutExpired(cmd="crontab", timeout=10) + + result = scheduler._setup_cron(ScheduleInterval.WEEKLY) + + assert result["success"] is False + assert "timed out" in result["error"] + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])