Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions cortex/cleanup/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
Cleanup module for Cortex.

This module provides disk cleanup functionality including:
- Scanning for cleanup opportunities (package cache, orphaned packages, temp files, logs)
- Executing cleanup operations with undo capability
- Managing quarantined files for safe recovery
- Scheduling automatic cleanup tasks
"""

from cortex.cleanup.cleaner import DiskCleaner
from cortex.cleanup.manager import CleanupManager, QuarantineItem
from cortex.cleanup.scanner import CleanupScanner, ScanResult

__all__ = [
"CleanupScanner",
"ScanResult",
"DiskCleaner",
"CleanupManager",
"QuarantineItem",
]
267 changes: 267 additions & 0 deletions cortex/cleanup/cleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
import gzip
import logging
import re
import shutil
from pathlib import Path
from typing import Optional

from cortex.cleanup.manager import CleanupManager
from cortex.cleanup.scanner import CleanupScanner, ScanResult
from cortex.utils.commands import run_command

logger = logging.getLogger(__name__)

# Category constants to avoid duplication
CATEGORY_PACKAGE_CACHE = "Package Cache"
CATEGORY_ORPHANED_PACKAGES = "Orphaned Packages"
CATEGORY_TEMP_FILES = "Temporary Files"
CATEGORY_OLD_LOGS = "Old Logs"

# Unit multipliers for parsing
UNIT_MULTIPLIERS = {
"KB": 1024,
"MB": 1024 * 1024,
"GB": 1024 * 1024 * 1024,
}


class DiskCleaner:
"""
Handles the actual cleanup operations including package cleaning,
orphaned package removal, temp file deletion, and log compression.
"""

def __init__(self, dry_run: bool = False):
"""
Initialize the DiskCleaner.

Args:
dry_run (bool): If True, simulate actions without modifying the filesystem.
"""
self.dry_run = dry_run
self.scanner = CleanupScanner()
self.manager = CleanupManager()
Comment on lines +34 to +43
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add missing return type hint.

The __init__ method is missing a return type hint (-> None). As per coding guidelines, type hints are required for Python code.

Apply this diff:

-    def __init__(self, dry_run: bool = False):
+    def __init__(self, dry_run: bool = False) -> None:
         """
         Initialize the DiskCleaner.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, dry_run: bool = False):
"""
Initialize the DiskCleaner.
Args:
dry_run (bool): If True, simulate actions without modifying the filesystem.
"""
self.dry_run = dry_run
self.scanner = CleanupScanner()
self.manager = CleanupManager()
def __init__(self, dry_run: bool = False) -> None:
"""
Initialize the DiskCleaner.
Args:
dry_run (bool): If True, simulate actions without modifying the filesystem.
"""
self.dry_run = dry_run
self.scanner = CleanupScanner()
self.manager = CleanupManager()
🤖 Prompt for AI Agents
In cortex/cleanup/cleaner.py around lines 34 to 43, the __init__ method is
missing the return type hint; update the method signature to include "-> None"
(i.e., def __init__(self, dry_run: bool = False) -> None:) and keep the existing
docstring and body unchanged so the constructor conforms to the project's typing
guidelines.


def clean_package_cache(self) -> int:
"""
Clean apt package cache using 'apt-get clean'.

Returns:
int: Number of bytes freed (estimated).
"""
# Get size before cleaning for reporting
scan_result = self.scanner.scan_package_cache()
size_freed = scan_result.size_bytes

if self.dry_run:
return size_freed

# Run apt-get clean (use -n for non-interactive mode)
cmd = "sudo -n apt-get clean"
result = run_command(cmd, validate=True)

if result.success:
return size_freed
else:
logger.error(f"Failed to clean package cache: {result.stderr}")
return 0

def remove_orphaned_packages(self, packages: list[str]) -> int:
"""
Remove orphaned packages using 'apt-get autoremove'.

Args:
packages (List[str]): List of package names to remove.

Returns:
int: Number of bytes freed (estimated).
"""
if not packages:
return 0

if self.dry_run:
return 0 # Size is estimated in scanner

# Use -n for non-interactive mode
cmd = "sudo -n apt-get autoremove -y"
result = run_command(cmd, validate=True)

freed_bytes = 0
if result.success:
freed_bytes = self._parse_freed_space(result.stdout)
return freed_bytes
else:
logger.error(f"Failed to remove orphaned packages: {result.stderr}")
return 0

def _parse_freed_space(self, stdout: str) -> int:
"""
Helper to parse freed space from apt output.

Args:
stdout (str): Output from apt command.

Returns:
int: Bytes freed.
"""
for line in stdout.splitlines():
if "disk space will be freed" in line:
return self._extract_size_from_line(line)
return 0

def _extract_size_from_line(self, line: str) -> int:
"""
Extract size in bytes from a line containing size information.

Args:
line (str): Line containing size info like "50.5 MB".

Returns:
int: Size in bytes.
"""
# Use string operations instead of regex to avoid ReDoS warnings
line_upper = line.upper()

for unit, multiplier in UNIT_MULTIPLIERS.items():
if unit in line_upper:
# Find the unit position and extract the number before it
idx = line_upper.find(unit)
if idx > 0:
# Extract characters before the unit (up to 20 chars back)
start = max(0, idx - 20)
prefix = line[start:idx].strip()
# Get the last word which should be the number
parts = prefix.split()
if parts:
try:
value = float(parts[-1])
return int(value * multiplier)
except ValueError:
continue
return 0

def clean_temp_files(self, files: list[str]) -> int:
"""
Remove temporary files by moving them to quarantine.

Args:
files (List[str]): List of file paths to remove.

Returns:
int: Number of bytes freed (estimated).
"""
freed_bytes = 0

for filepath_str in files:
filepath = Path(filepath_str)
if not filepath.exists():
continue

# Get size before any operation
try:
size = filepath.stat().st_size
except OSError:
size = 0

if self.dry_run:
freed_bytes += size
continue

# Move to quarantine
item_id = self.manager.quarantine_file(str(filepath))
if item_id:
freed_bytes += size
else:
logger.warning(f"Failed to quarantine temp file: {filepath}")

return freed_bytes

def compress_logs(self, files: list[str]) -> int:
"""
Compress log files using gzip.

Args:
files (List[str]): List of log file paths to compress.

Returns:
int: Number of bytes freed.
"""
freed_bytes = 0

for filepath_str in files:
filepath = Path(filepath_str)
if not filepath.exists():
continue

try:
original_size = filepath.stat().st_size

if self.dry_run:
# Estimate compression ratio (e.g. 90% reduction)
freed_bytes += int(original_size * 0.9)
continue

# Compress
gz_path = filepath.with_suffix(filepath.suffix + ".gz")
with open(filepath, "rb") as f_in:
with gzip.open(gz_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)

# Verify compressed file exists and has size
if gz_path.exists():
compressed_size = gz_path.stat().st_size
# Remove original
filepath.unlink()
freed_bytes += original_size - compressed_size

except Exception as e:
logger.error(f"Failed to compress {filepath}: {e}")

return freed_bytes

def run_cleanup(self, scan_results: list[ScanResult], safe: bool = True) -> dict[str, int]:
"""
Run cleanup based on scan results.

Args:
scan_results (List[ScanResult]): Results from scanner.
safe (bool): If True, perform safe cleanup (default).

Returns:
Dict[str, int]: Summary of bytes freed per category.
"""
summary = {
CATEGORY_PACKAGE_CACHE: 0,
CATEGORY_ORPHANED_PACKAGES: 0,
CATEGORY_TEMP_FILES: 0,
CATEGORY_OLD_LOGS: 0,
}

for result in scan_results:
freed = self._process_category(result, safe)
if result.category in summary:
summary[result.category] = freed

return summary

def _process_category(self, result: ScanResult, safe: bool) -> int:
"""
Process a single cleanup category.

Args:
result (ScanResult): Scan result for the category.
safe (bool): Whether to use safe mode.

Returns:
int: Bytes freed.
"""
if result.category == CATEGORY_PACKAGE_CACHE:
return self.clean_package_cache()
elif result.category == CATEGORY_ORPHANED_PACKAGES:
# Only remove orphaned packages in non-safe mode
return self.remove_orphaned_packages(result.items) if not safe else 0
elif result.category == CATEGORY_TEMP_FILES:
return self.clean_temp_files(result.items)
elif result.category == CATEGORY_OLD_LOGS:
return self.compress_logs(result.items)
return 0
Loading
Loading