diff --git a/README_ACCELERATOR_LIMITS.md b/README_ACCELERATOR_LIMITS.md new file mode 100644 index 00000000..c289dfba --- /dev/null +++ b/README_ACCELERATOR_LIMITS.md @@ -0,0 +1,151 @@ +# Cortex Accelerator-Aware Resource Limits + +A cgroups v2 wrapper for managing GPU, CPU, and memory resources for AI workloads with workload presets. + +## Quick Start + +```bash +# Create a profile with the inference preset and 2 GPUs +cortex limits create inference-job --preset inference --gpus 2 + +# Apply limits to a running process +cortex limits apply inference-job --pid 12345 + +# Get environment variables for new processes +eval $(cortex limits env inference-job) + +# Check status +cortex limits status inference-job +``` + +## Workload Presets + +| Preset | CPU | Memory | GPU | OOM Score | Use Case | +|--------|-----|--------|-----|-----------|----------| +| inference | 4 cores | 32G | 100% | -500 | Low-latency serving | +| training | 16 cores | 128G | 100% | -800 | Long training jobs | +| batch | 8 cores | 64G | 80% | 0 | Background processing | +| interactive | 2 cores | 16G | 50% | -200 | Development | + +## Features + +- **cgroups v2 unified hierarchy support**: Full integration with modern Linux cgroups +- **Workload presets**: Sensible defaults for common AI workload patterns +- **NVIDIA GPU isolation**: Set `CUDA_VISIBLE_DEVICES` automatically +- **OOM score adjustment**: Protect critical AI jobs from the OOM killer +- **CPU quota and weight**: Fine-grained CPU resource control +- **Memory limits**: Hard (max) and soft (high) limits with reclaim triggers +- **User mode delegation**: Works without root when cgroups delegation is enabled + +## Commands + +### Create Profile + +```bash +cortex limits create [options] + +Options: + --preset Workload preset (inference, training, batch, interactive) + --gpus Number of GPUs to allocate + --cpu CPU quota percentage (100 = 1 core) + --memory Memory limit in GB + --oom-adj OOM score adjustment (-1000 to 1000) +``` + +### Apply to Process + +```bash +cortex limits apply --pid +``` + +### Environment Variables + +```bash +# Print exports for shell +cortex limits env + +# Apply to current shell +eval $(cortex limits env ) +``` + +### Status + +```bash +# List all profiles +cortex limits list + +# Show specific profile +cortex limits status + +# Show available presets +cortex limits presets +``` + +### Delete Profile + +```bash +cortex limits delete +``` + +## Environment Variables Generated + +When using `cortex limits env`, the following variables are set: + +| Variable | Description | +|----------|-------------| +| `CUDA_VISIBLE_DEVICES` | NVIDIA GPU visibility | +| `HIP_VISIBLE_DEVICES` | AMD ROCm GPU visibility | +| `ONEAPI_DEVICE_SELECTOR` | Intel oneAPI device selection | +| `OMP_NUM_THREADS` | OpenMP thread count | +| `MKL_NUM_THREADS` | Intel MKL thread count | +| `OPENBLAS_NUM_THREADS` | OpenBLAS thread count | +| `TF_MEMORY_ALLOCATION` | TensorFlow memory hint | +| `PYTORCH_CUDA_ALLOC_CONF` | PyTorch CUDA allocator config | + +## User Mode (Non-Root) + +For non-root usage, enable cgroups delegation: + +```bash +# Check if delegation is enabled +cat /sys/fs/cgroup/user.slice/user-$(id -u).slice/cgroup.subtree_control + +# Enable delegation (as root) +mkdir -p /etc/systemd/system/user@.service.d +cat > /etc/systemd/system/user@.service.d/delegate.conf << EOF +[Service] +Delegate=cpu cpuset io memory pids +EOF +systemctl daemon-reload +``` + +## Architecture + +``` +AcceleratorLimitsManager +├── LimitsDatabase (SQLite storage) +├── CgroupsV2Controller (cgroups interface) +└── OOMScoreManager (OOM score adjustment) + +ResourceLimits (dataclass) +├── CPU: quota, weight, affinity +├── Memory: max, high limits +├── GPU: device IDs, percentage +└── OOM: score adjustment +``` + +## Testing + +```bash +pytest tests/test_accelerator_limits.py -v +``` + +## Files + +- `cortex/kernel_features/accelerator_limits.py` - Main implementation +- `tests/test_accelerator_limits.py` - Unit tests +- `README_ACCELERATOR_LIMITS.md` - This file + +## Related Issues + +- [#222 Accelerator-Aware Resource Limits](https://github.com/cortexlinux/cortex/issues/222) diff --git a/cortex/kernel_features/accelerator_limits.py b/cortex/kernel_features/accelerator_limits.py index 81a5eb3d..c6477f42 100644 --- a/cortex/kernel_features/accelerator_limits.py +++ b/cortex/kernel_features/accelerator_limits.py @@ -2,123 +2,717 @@ """ Cortex Accelerator-Aware Resource Limits -cgroups v2 wrapper for AI workloads. +cgroups v2 wrapper for AI workloads with preset profiles for inference, +training, batch, and interactive workloads. + +Features: +- cgroups v2 unified hierarchy support +- Workload presets with sensible defaults +- NVIDIA GPU isolation via environment variables +- OOM score adjustment for priority +- CPU quota, weight, and affinity +- Memory hard and soft limits +- User mode delegation support """ import os import json import sqlite3 import subprocess +import shutil from pathlib import Path -from dataclasses import dataclass, asdict -from typing import Optional, List, Dict +from dataclasses import dataclass, asdict, field +from typing import Optional, List, Dict, Tuple from enum import Enum +import logging +# Configuration CORTEX_DB = Path.home() / ".cortex/limits.db" CGROUP_ROOT = Path("/sys/fs/cgroup") +CORTEX_CGROUP = "cortex.slice" + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') +logger = logging.getLogger(__name__) + class WorkloadPreset(Enum): + """Predefined workload configurations optimized for AI tasks.""" INFERENCE = "inference" TRAINING = "training" BATCH = "batch" INTERACTIVE = "interactive" + +# Preset configurations +# CPU values are in percentage (100 = 1 core) PRESETS = { - "inference": {"cpu": 400, "memory_gb": 32, "oom_adj": -500, "gpu_pct": 100}, - "training": {"cpu": 1600, "memory_gb": 128, "oom_adj": -800, "gpu_pct": 100}, - "batch": {"cpu": 800, "memory_gb": 64, "oom_adj": 0, "gpu_pct": 80}, - "interactive": {"cpu": 200, "memory_gb": 16, "oom_adj": -200, "gpu_pct": 50}, + "inference": { + "cpu_quota": 400, # 4 cores + "cpu_weight": 100, # normal priority + "memory_gb": 32, + "memory_high_gb": 28, # soft limit + "oom_score_adj": -500, + "gpu_percent": 100, + "description": "Low-latency serving" + }, + "training": { + "cpu_quota": 1600, # 16 cores + "cpu_weight": 150, # higher priority + "memory_gb": 128, + "memory_high_gb": 120, + "oom_score_adj": -800, + "gpu_percent": 100, + "description": "Long training jobs" + }, + "batch": { + "cpu_quota": 800, # 8 cores + "cpu_weight": 50, # lower priority + "memory_gb": 64, + "memory_high_gb": 56, + "oom_score_adj": 0, + "gpu_percent": 80, + "description": "Background processing" + }, + "interactive": { + "cpu_quota": 200, # 2 cores + "cpu_weight": 120, # slightly higher for responsiveness + "memory_gb": 16, + "memory_high_gb": 14, + "oom_score_adj": -200, + "gpu_percent": 50, + "description": "Development" + }, } + @dataclass class ResourceLimits: + """Resource limit configuration for a workload profile.""" name: str preset: str = "inference" - cpu_quota: float = 400.0 - memory_max: int = 32 * 1024**3 - gpu_ids: List[int] = None - oom_score_adj: int = 0 - + cpu_quota: float = 400.0 # percentage (100 = 1 core) + cpu_weight: int = 100 # 1-10000, default 100 + memory_max: int = 32 * 1024**3 # hard limit in bytes + memory_high: int = 28 * 1024**3 # soft limit in bytes + gpu_ids: List[int] = field(default_factory=list) + gpu_percent: int = 100 + oom_score_adj: int = 0 # -1000 to 1000 + cpu_affinity: List[int] = field(default_factory=list) # specific CPUs + cgroup_path: str = "" + def __post_init__(self): - self.gpu_ids = self.gpu_ids or [] - + if self.gpu_ids is None: + self.gpu_ids = [] + if self.cpu_affinity is None: + self.cpu_affinity = [] + if not self.cgroup_path: + self.cgroup_path = f"{CORTEX_CGROUP}/{self.name}" + @classmethod - def from_preset(cls, name: str, preset: str, gpus: int = 0): + def from_preset(cls, name: str, preset: str, gpus: int = 0) -> 'ResourceLimits': + """Create ResourceLimits from a preset configuration.""" p = PRESETS.get(preset, PRESETS["inference"]) - return cls(name, preset, p["cpu"], int(p["memory_gb"] * 1e9), - list(range(gpus)), p["oom_adj"]) + return cls( + name=name, + preset=preset, + cpu_quota=p["cpu_quota"], + cpu_weight=p["cpu_weight"], + memory_max=int(p["memory_gb"] * 1024**3), + memory_high=int(p["memory_high_gb"] * 1024**3), + gpu_ids=list(range(gpus)) if gpus > 0 else [], + gpu_percent=p["gpu_percent"], + oom_score_adj=p["oom_score_adj"] + ) + + def to_dict(self) -> dict: + """Convert to dictionary for serialization.""" + return asdict(self) + + @classmethod + def from_dict(cls, data: dict) -> 'ResourceLimits': + """Create from dictionary.""" + return cls(**data) class LimitsDatabase: - def __init__(self): - CORTEX_DB.parent.mkdir(parents=True, exist_ok=True) - with sqlite3.connect(CORTEX_DB) as conn: - conn.execute("CREATE TABLE IF NOT EXISTS profiles (name TEXT PRIMARY KEY, config TEXT)") - - def save(self, limits: ResourceLimits): - with sqlite3.connect(CORTEX_DB) as conn: - conn.execute("INSERT OR REPLACE INTO profiles VALUES (?,?)", - (limits.name, json.dumps(asdict(limits)))) - + """SQLite-based storage for resource limit profiles.""" + + def __init__(self, db_path: Path = CORTEX_DB): + self.db_path = db_path + self._init_db() + + def _init_db(self): + """Initialize database schema.""" + self.db_path.parent.mkdir(parents=True, exist_ok=True) + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS profiles ( + name TEXT PRIMARY KEY, + config TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS applied_pids ( + pid INTEGER, + profile_name TEXT, + applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (pid, profile_name) + ) + """) + + def save(self, limits: ResourceLimits) -> bool: + """Save or update a profile.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute( + "INSERT OR REPLACE INTO profiles (name, config, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)", + (limits.name, json.dumps(limits.to_dict())) + ) + return True + def get(self, name: str) -> Optional[ResourceLimits]: - with sqlite3.connect(CORTEX_DB) as conn: - row = conn.execute("SELECT config FROM profiles WHERE name=?", (name,)).fetchone() - return ResourceLimits(**json.loads(row[0])) if row else None - - def list_all(self): - with sqlite3.connect(CORTEX_DB) as conn: - return [ResourceLimits(**json.loads(r[0])) for r in conn.execute("SELECT config FROM profiles")] + """Retrieve a profile by name.""" + with sqlite3.connect(self.db_path) as conn: + row = conn.execute( + "SELECT config FROM profiles WHERE name = ?", (name,) + ).fetchone() + if row: + return ResourceLimits.from_dict(json.loads(row[0])) + return None + + def delete(self, name: str) -> bool: + """Delete a profile.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute("DELETE FROM profiles WHERE name = ?", (name,)) + conn.execute("DELETE FROM applied_pids WHERE profile_name = ?", (name,)) + return True + + def list_all(self) -> List[ResourceLimits]: + """List all profiles.""" + with sqlite3.connect(self.db_path) as conn: + rows = conn.execute("SELECT config FROM profiles").fetchall() + return [ResourceLimits.from_dict(json.loads(r[0])) for r in rows] + + def record_pid(self, pid: int, profile_name: str): + """Record that a PID has been assigned to a profile.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute( + "INSERT OR REPLACE INTO applied_pids (pid, profile_name) VALUES (?, ?)", + (pid, profile_name) + ) + + def get_pids(self, profile_name: str) -> List[int]: + """Get PIDs assigned to a profile.""" + with sqlite3.connect(self.db_path) as conn: + rows = conn.execute( + "SELECT pid FROM applied_pids WHERE profile_name = ?", + (profile_name,) + ).fetchall() + return [r[0] for r in rows] + + +class CgroupsV2Controller: + """Interface to cgroups v2 filesystem.""" + + def __init__(self, root: Path = CGROUP_ROOT): + self.root = root + self.cortex_root = root / CORTEX_CGROUP + + def is_available(self) -> bool: + """Check if cgroups v2 is available.""" + return (self.root / "cgroup.controllers").exists() + + def is_user_delegated(self) -> bool: + """Check if user delegation is enabled.""" + user_slice = self.root / f"user.slice/user-{os.getuid()}.slice" + if user_slice.exists(): + subtree = user_slice / "cgroup.subtree_control" + if subtree.exists(): + return True + return os.getuid() == 0 + + def get_available_controllers(self) -> List[str]: + """Get list of available controllers.""" + controllers_file = self.root / "cgroup.controllers" + if controllers_file.exists(): + return controllers_file.read_text().strip().split() + return [] + + def create_cgroup(self, path: str) -> Tuple[bool, str]: + """Create a cgroup hierarchy.""" + cgroup_path = self.root / path + try: + cgroup_path.mkdir(parents=True, exist_ok=True) + + # Enable controllers on parent + parent = cgroup_path.parent + if parent != self.root: + subtree_control = parent / "cgroup.subtree_control" + if subtree_control.exists(): + controllers = self.get_available_controllers() + for ctrl in ['cpu', 'memory', 'io']: + if ctrl in controllers: + try: + subtree_control.write_text(f"+{ctrl}") + except PermissionError: + pass + + return True, str(cgroup_path) + except PermissionError as e: + return False, f"Permission denied: {e}" + except Exception as e: + return False, str(e) + + def apply_cpu_limits(self, path: str, quota: float, weight: int = 100) -> bool: + """ + Apply CPU limits to a cgroup. + + Args: + path: cgroup path relative to root + quota: CPU percentage (100 = 1 core) + weight: CPU weight (1-10000, default 100) + """ + cgroup_path = self.root / path + + # cpu.max format: MAX PERIOD (microseconds) + # quota of 400% = 400000us per 100000us period = 4 cores + period = 100000 + max_quota = int(quota * 1000) # convert percentage to microseconds + + try: + cpu_max = cgroup_path / "cpu.max" + if cpu_max.exists(): + cpu_max.write_text(f"{max_quota} {period}") + + cpu_weight = cgroup_path / "cpu.weight" + if cpu_weight.exists(): + # weight must be 1-10000 + cpu_weight.write_text(str(max(1, min(10000, weight)))) + + return True + except PermissionError: + logger.warning(f"Permission denied setting CPU limits for {path}") + return False + except Exception as e: + logger.error(f"Failed to set CPU limits: {e}") + return False + + def apply_memory_limits(self, path: str, max_bytes: int, high_bytes: int = 0) -> bool: + """ + Apply memory limits to a cgroup. + + Args: + path: cgroup path relative to root + max_bytes: hard memory limit + high_bytes: soft memory limit (triggers reclaim) + """ + cgroup_path = self.root / path + + try: + memory_max = cgroup_path / "memory.max" + if memory_max.exists(): + memory_max.write_text(str(max_bytes)) + + if high_bytes > 0: + memory_high = cgroup_path / "memory.high" + if memory_high.exists(): + memory_high.write_text(str(high_bytes)) + + return True + except PermissionError: + logger.warning(f"Permission denied setting memory limits for {path}") + return False + except Exception as e: + logger.error(f"Failed to set memory limits: {e}") + return False + + def add_pid(self, path: str, pid: int) -> bool: + """Add a process to a cgroup.""" + cgroup_path = self.root / path + procs_file = cgroup_path / "cgroup.procs" + + try: + if procs_file.exists(): + procs_file.write_text(str(pid)) + return True + except PermissionError: + logger.warning(f"Permission denied adding PID {pid} to {path}") + except Exception as e: + logger.error(f"Failed to add PID: {e}") + return False + + def get_pids(self, path: str) -> List[int]: + """Get PIDs in a cgroup.""" + cgroup_path = self.root / path + procs_file = cgroup_path / "cgroup.procs" + + try: + if procs_file.exists(): + content = procs_file.read_text().strip() + if content: + return [int(p) for p in content.split('\n')] + except Exception: + pass + return [] + + def delete_cgroup(self, path: str) -> bool: + """Delete a cgroup (must be empty).""" + cgroup_path = self.root / path + try: + if cgroup_path.exists(): + cgroup_path.rmdir() + return True + except OSError: + return False + + +class OOMScoreManager: + """Manage OOM score adjustments for processes.""" + + @staticmethod + def set_oom_score_adj(pid: int, score: int) -> bool: + """ + Set OOM score adjustment for a process. + + Args: + pid: Process ID + score: -1000 (never kill) to 1000 (always kill first) + """ + score = max(-1000, min(1000, score)) + oom_file = Path(f"/proc/{pid}/oom_score_adj") + + try: + if oom_file.exists(): + oom_file.write_text(str(score)) + return True + except PermissionError: + logger.warning(f"Permission denied setting OOM score for PID {pid}") + except Exception as e: + logger.error(f"Failed to set OOM score: {e}") + return False + + @staticmethod + def get_oom_score_adj(pid: int) -> Optional[int]: + """Get current OOM score adjustment.""" + oom_file = Path(f"/proc/{pid}/oom_score_adj") + try: + if oom_file.exists(): + return int(oom_file.read_text().strip()) + except Exception: + pass + return None class AcceleratorLimitsManager: + """Main manager for accelerator-aware resource limits.""" + def __init__(self): self.db = LimitsDatabase() - + self.cgroups = CgroupsV2Controller() + self.oom = OOMScoreManager() + def create(self, limits: ResourceLimits) -> bool: + """Create a new resource limit profile.""" + # Save to database self.db.save(limits) - print(f"✅ Created profile '{limits.name}' (preset: {limits.preset})") + + # Create cgroup + success, msg = self.cgroups.create_cgroup(limits.cgroup_path) + if success: + # Apply limits to cgroup + self.cgroups.apply_cpu_limits( + limits.cgroup_path, + limits.cpu_quota, + limits.cpu_weight + ) + self.cgroups.apply_memory_limits( + limits.cgroup_path, + limits.memory_max, + limits.memory_high + ) + print(f"Created profile '{limits.name}' (preset: {limits.preset})") + print(f" CPU: {limits.cpu_quota/100:.0f} cores, Memory: {limits.memory_max/1e9:.0f}GB") + if limits.gpu_ids: + print(f" GPUs: {','.join(map(str, limits.gpu_ids))}") + else: + print(f"Created profile '{limits.name}' (cgroup creation failed: {msg})") + print(" Profile saved, but system limits not applied.") + return True - - def get_env(self, name: str) -> Dict[str, str]: - limits = self.db.get(name) + + def apply(self, profile_name: str, pid: int) -> bool: + """Apply a profile's limits to a running process.""" + limits = self.db.get(profile_name) + if not limits: + print(f"Profile '{profile_name}' not found") + return False + + # Verify process exists + if not Path(f"/proc/{pid}").exists(): + print(f"Process {pid} not found") + return False + + success = True + + # Add to cgroup + if self.cgroups.add_pid(limits.cgroup_path, pid): + print(f"Added PID {pid} to cgroup {limits.cgroup_path}") + else: + print(f"Could not add PID {pid} to cgroup (may need root)") + success = False + + # Set OOM score + if self.oom.set_oom_score_adj(pid, limits.oom_score_adj): + print(f"Set OOM score adjustment to {limits.oom_score_adj}") + else: + print(f"Could not set OOM score (may need root)") + success = False + + # Record in database + self.db.record_pid(pid, profile_name) + + return success + + def get_env(self, profile_name: str) -> Dict[str, str]: + """Get environment variables for a profile.""" + limits = self.db.get(profile_name) if not limits: return {} - return {"CUDA_VISIBLE_DEVICES": ",".join(map(str, limits.gpu_ids))} - - def status(self): + + env = {} + + # GPU isolation + if limits.gpu_ids: + env["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, limits.gpu_ids)) + # Also set for AMD ROCm + env["HIP_VISIBLE_DEVICES"] = env["CUDA_VISIBLE_DEVICES"] + # Intel oneAPI + env["ONEAPI_DEVICE_SELECTOR"] = f"level_zero:{','.join(map(str, limits.gpu_ids))}" + + # Memory hints for ML frameworks + if limits.memory_max > 0: + mem_gb = limits.memory_max // (1024**3) + env["TF_MEMORY_ALLOCATION"] = str(mem_gb * 1024) # MB for TensorFlow + env["PYTORCH_CUDA_ALLOC_CONF"] = f"max_split_size_mb:{mem_gb * 512}" + + # CPU hints + if limits.cpu_quota > 0: + cores = int(limits.cpu_quota / 100) + env["OMP_NUM_THREADS"] = str(cores) + env["MKL_NUM_THREADS"] = str(cores) + env["OPENBLAS_NUM_THREADS"] = str(cores) + + return env + + def print_env(self, profile_name: str): + """Print environment variables as shell exports.""" + env = self.get_env(profile_name) + if not env: + print(f"# Profile '{profile_name}' not found or has no env vars", file=__import__('sys').stderr) + return + + for key, value in env.items(): + print(f"export {key}={value}") + + def status(self, profile_name: Optional[str] = None): + """Show status of profiles.""" + if profile_name: + limits = self.db.get(profile_name) + if not limits: + print(f"Profile '{profile_name}' not found") + return + self._print_profile_detail(limits) + else: + self._print_profiles_table() + + def _print_profiles_table(self): + """Print summary table of all profiles.""" profiles = self.db.list_all() - print(f"\n{'NAME':<20} {'PRESET':<12} {'CPU':<8} {'MEMORY':<10} {'GPUS':<10}") - print("-" * 65) + + if not profiles: + print("No profiles configured. Create one with:") + print(" cortex limits create --preset inference --gpus 2") + return + + print(f"\n{'NAME':<20} {'PRESET':<12} {'CPU':<8} {'MEMORY':<10} {'GPUS':<10} {'OOM':<8}") + print("-" * 75) + for p in profiles: - gpus = ",".join(map(str, p.gpu_ids)) or "-" - print(f"{p.name:<20} {p.preset:<12} {p.cpu_quota/100:.0f}{'':<5} {p.memory_max/1e9:.0f}G{'':<5} {gpus:<10}") + gpus = ",".join(map(str, p.gpu_ids)) if p.gpu_ids else "-" + cpu_cores = f"{p.cpu_quota/100:.0f}" + memory = f"{p.memory_max/1e9:.0f}G" + print(f"{p.name:<20} {p.preset:<12} {cpu_cores:<8} {memory:<10} {gpus:<10} {p.oom_score_adj:<8}") + print() + + def _print_profile_detail(self, limits: ResourceLimits): + """Print detailed information about a profile.""" + print(f"\n=== Profile: {limits.name} ===") + print(f"Preset: {limits.preset}") + print(f"cgroup path: {limits.cgroup_path}") + print() + print("CPU Limits:") + print(f" Quota: {limits.cpu_quota/100:.1f} cores ({limits.cpu_quota}%)") + print(f" Weight: {limits.cpu_weight}") + if limits.cpu_affinity: + print(f" Affinity: CPUs {','.join(map(str, limits.cpu_affinity))}") + print() + print("Memory Limits:") + print(f" Hard limit (max): {limits.memory_max/1e9:.1f} GB") + print(f" Soft limit (high): {limits.memory_high/1e9:.1f} GB") + print() + print("GPU Configuration:") + if limits.gpu_ids: + print(f" Visible GPUs: {','.join(map(str, limits.gpu_ids))}") + else: + print(" No GPU restrictions") + print(f" GPU allocation: {limits.gpu_percent}%") + print() + print(f"OOM Score Adjustment: {limits.oom_score_adj}") + print() + + # Show applied PIDs + pids = self.db.get_pids(limits.name) + active_pids = [p for p in pids if Path(f"/proc/{p}").exists()] + if active_pids: + print(f"Active processes: {', '.join(map(str, active_pids))}") + + # Show cgroup PIDs + cgroup_pids = self.cgroups.get_pids(limits.cgroup_path) + if cgroup_pids: + print(f"Processes in cgroup: {', '.join(map(str, cgroup_pids))}") + print() + + def delete(self, profile_name: str) -> bool: + """Delete a profile.""" + limits = self.db.get(profile_name) + if not limits: + print(f"Profile '{profile_name}' not found") + return False + + # Try to delete cgroup + self.cgroups.delete_cgroup(limits.cgroup_path) + + # Delete from database + self.db.delete(profile_name) + print(f"Deleted profile '{profile_name}'") + return True + + def list_presets(self): + """List available presets.""" + print("\nAvailable Presets:") + print("-" * 70) + print(f"{'PRESET':<15} {'CPU':<8} {'MEMORY':<10} {'OOM':<8} {'DESCRIPTION':<25}") + print("-" * 70) + + for name, config in PRESETS.items(): + print(f"{name:<15} {config['cpu_quota']/100:.0f}{'':<5} " + f"{config['memory_gb']}G{'':<6} {config['oom_score_adj']:<8} " + f"{config['description']:<25}") + print() def main(): + """CLI entry point.""" import argparse - parser = argparse.ArgumentParser(description="Cortex Accelerator Limits") - sub = parser.add_subparsers(dest="cmd") - - c = sub.add_parser("create") - c.add_argument("name") - c.add_argument("--preset", default="inference") - c.add_argument("--gpus", type=int, default=0) - - sub.add_parser("env").add_argument("name") - sub.add_parser("status") - sub.add_parser("list") - + import sys + + parser = argparse.ArgumentParser( + description="Cortex Accelerator-Aware Resource Limits", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + cortex limits create inference-job --preset inference --gpus 2 + cortex limits apply inference-job --pid 12345 + eval $(cortex limits env inference-job) + cortex limits status inference-job + cortex limits list + cortex limits presets + """ + ) + + subparsers = parser.add_subparsers(dest="command", help="Command to run") + + # create command + create_parser = subparsers.add_parser("create", help="Create a new profile") + create_parser.add_argument("name", help="Profile name") + create_parser.add_argument("--preset", default="inference", + choices=list(PRESETS.keys()), + help="Workload preset (default: inference)") + create_parser.add_argument("--gpus", type=int, default=0, + help="Number of GPUs to allocate") + create_parser.add_argument("--cpu", type=float, + help="CPU quota percentage (100 = 1 core)") + create_parser.add_argument("--memory", type=int, + help="Memory limit in GB") + create_parser.add_argument("--oom-adj", type=int, + help="OOM score adjustment (-1000 to 1000)") + + # apply command + apply_parser = subparsers.add_parser("apply", help="Apply profile to a process") + apply_parser.add_argument("name", help="Profile name") + apply_parser.add_argument("--pid", type=int, required=True, + help="Process ID to apply limits to") + + # env command + env_parser = subparsers.add_parser("env", help="Print environment variables") + env_parser.add_argument("name", help="Profile name") + + # status command + status_parser = subparsers.add_parser("status", help="Show profile status") + status_parser.add_argument("name", nargs="?", help="Profile name (optional)") + + # list command + subparsers.add_parser("list", help="List all profiles") + + # delete command + delete_parser = subparsers.add_parser("delete", help="Delete a profile") + delete_parser.add_argument("name", help="Profile name") + + # presets command + subparsers.add_parser("presets", help="List available presets") + args = parser.parse_args() + + if not args.command: + parser.print_help() + sys.exit(1) + mgr = AcceleratorLimitsManager() - - if args.cmd == "create": - mgr.create(ResourceLimits.from_preset(args.name, args.preset, args.gpus)) - elif args.cmd == "env": - for k, v in mgr.get_env(args.name).items(): - print(f"export {k}={v}") - elif args.cmd in ("status", "list"): + + if args.command == "create": + limits = ResourceLimits.from_preset(args.name, args.preset, args.gpus) + + # Override with explicit arguments + if args.cpu is not None: + limits.cpu_quota = args.cpu + if args.memory is not None: + limits.memory_max = args.memory * 1024**3 + limits.memory_high = int(limits.memory_max * 0.875) + if args.oom_adj is not None: + limits.oom_score_adj = args.oom_adj + + mgr.create(limits) + + elif args.command == "apply": + mgr.apply(args.name, args.pid) + + elif args.command == "env": + mgr.print_env(args.name) + + elif args.command == "status": + mgr.status(args.name) + + elif args.command == "list": mgr.status() + elif args.command == "delete": + mgr.delete(args.name) + + elif args.command == "presets": + mgr.list_presets() + if __name__ == "__main__": main() diff --git a/tests/test_accelerator_limits.py b/tests/test_accelerator_limits.py new file mode 100644 index 00000000..06ae4490 --- /dev/null +++ b/tests/test_accelerator_limits.py @@ -0,0 +1,394 @@ +#!/usr/bin/env python3 +""" +Unit tests for Cortex Accelerator-Aware Resource Limits + +Tests cover: +- Profile creation with presets +- Resource limit configuration +- Database operations +- cgroups v2 controller (mocked for non-root) +- OOM score management +- Environment variable generation +""" + +import os +import sys +import json +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch, MagicMock + +# Add parent to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from cortex.kernel_features.accelerator_limits import ( + ResourceLimits, + LimitsDatabase, + CgroupsV2Controller, + OOMScoreManager, + AcceleratorLimitsManager, + PRESETS, + WorkloadPreset, + CORTEX_CGROUP +) + + +class TestResourceLimits(unittest.TestCase): + """Test ResourceLimits dataclass.""" + + def test_default_values(self): + """Test default values are set correctly.""" + limits = ResourceLimits(name="test") + self.assertEqual(limits.name, "test") + self.assertEqual(limits.preset, "inference") + self.assertEqual(limits.cpu_quota, 400.0) + self.assertEqual(limits.cpu_weight, 100) + self.assertEqual(limits.gpu_ids, []) + self.assertEqual(limits.oom_score_adj, 0) + + def test_from_preset_inference(self): + """Test creating limits from inference preset.""" + limits = ResourceLimits.from_preset("my-job", "inference", gpus=2) + self.assertEqual(limits.name, "my-job") + self.assertEqual(limits.preset, "inference") + self.assertEqual(limits.cpu_quota, 400) + self.assertEqual(limits.memory_max, 32 * 1024**3) + self.assertEqual(limits.gpu_ids, [0, 1]) + self.assertEqual(limits.oom_score_adj, -500) + + def test_from_preset_training(self): + """Test creating limits from training preset.""" + limits = ResourceLimits.from_preset("train-job", "training", gpus=4) + self.assertEqual(limits.cpu_quota, 1600) + self.assertEqual(limits.memory_max, 128 * 1024**3) + self.assertEqual(limits.gpu_ids, [0, 1, 2, 3]) + self.assertEqual(limits.oom_score_adj, -800) + + def test_from_preset_batch(self): + """Test creating limits from batch preset.""" + limits = ResourceLimits.from_preset("batch-job", "batch") + self.assertEqual(limits.cpu_quota, 800) + self.assertEqual(limits.memory_max, 64 * 1024**3) + self.assertEqual(limits.oom_score_adj, 0) + + def test_from_preset_interactive(self): + """Test creating limits from interactive preset.""" + limits = ResourceLimits.from_preset("dev-env", "interactive") + self.assertEqual(limits.cpu_quota, 200) + self.assertEqual(limits.memory_max, 16 * 1024**3) + self.assertEqual(limits.oom_score_adj, -200) + + def test_from_preset_unknown_falls_back(self): + """Test unknown preset falls back to inference.""" + limits = ResourceLimits.from_preset("job", "unknown-preset") + self.assertEqual(limits.preset, "unknown-preset") + self.assertEqual(limits.cpu_quota, PRESETS["inference"]["cpu_quota"]) + + def test_cgroup_path_auto_generated(self): + """Test cgroup path is auto-generated.""" + limits = ResourceLimits(name="test-job") + self.assertEqual(limits.cgroup_path, f"{CORTEX_CGROUP}/test-job") + + def test_to_dict_and_from_dict(self): + """Test serialization roundtrip.""" + original = ResourceLimits.from_preset("roundtrip", "training", gpus=2) + data = original.to_dict() + restored = ResourceLimits.from_dict(data) + + self.assertEqual(restored.name, original.name) + self.assertEqual(restored.preset, original.preset) + self.assertEqual(restored.cpu_quota, original.cpu_quota) + self.assertEqual(restored.memory_max, original.memory_max) + self.assertEqual(restored.gpu_ids, original.gpu_ids) + + +class TestLimitsDatabase(unittest.TestCase): + """Test LimitsDatabase storage.""" + + def setUp(self): + """Create temporary database.""" + self.temp_dir = tempfile.mkdtemp() + self.db_path = Path(self.temp_dir) / "test.db" + self.db = LimitsDatabase(self.db_path) + + def tearDown(self): + """Clean up temporary files.""" + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_save_and_get(self): + """Test saving and retrieving a profile.""" + limits = ResourceLimits.from_preset("test-save", "inference", gpus=1) + self.db.save(limits) + + retrieved = self.db.get("test-save") + self.assertIsNotNone(retrieved) + self.assertEqual(retrieved.name, "test-save") + self.assertEqual(retrieved.preset, "inference") + self.assertEqual(retrieved.gpu_ids, [0]) + + def test_get_nonexistent(self): + """Test getting a profile that doesn't exist.""" + result = self.db.get("nonexistent") + self.assertIsNone(result) + + def test_list_all(self): + """Test listing all profiles.""" + self.db.save(ResourceLimits.from_preset("job1", "inference")) + self.db.save(ResourceLimits.from_preset("job2", "training")) + self.db.save(ResourceLimits.from_preset("job3", "batch")) + + profiles = self.db.list_all() + self.assertEqual(len(profiles), 3) + names = [p.name for p in profiles] + self.assertIn("job1", names) + self.assertIn("job2", names) + self.assertIn("job3", names) + + def test_delete(self): + """Test deleting a profile.""" + self.db.save(ResourceLimits.from_preset("to-delete", "inference")) + self.assertIsNotNone(self.db.get("to-delete")) + + self.db.delete("to-delete") + self.assertIsNone(self.db.get("to-delete")) + + def test_update_existing(self): + """Test updating an existing profile.""" + limits = ResourceLimits.from_preset("update-test", "inference") + self.db.save(limits) + + # Update with different values + limits.cpu_quota = 800 + limits.gpu_ids = [0, 1, 2] + self.db.save(limits) + + retrieved = self.db.get("update-test") + self.assertEqual(retrieved.cpu_quota, 800) + self.assertEqual(retrieved.gpu_ids, [0, 1, 2]) + + def test_record_and_get_pids(self): + """Test recording and retrieving PIDs.""" + self.db.save(ResourceLimits.from_preset("pid-test", "inference")) + + self.db.record_pid(1234, "pid-test") + self.db.record_pid(5678, "pid-test") + + pids = self.db.get_pids("pid-test") + self.assertEqual(len(pids), 2) + self.assertIn(1234, pids) + self.assertIn(5678, pids) + + +class TestCgroupsV2Controller(unittest.TestCase): + """Test cgroups v2 controller (with mocks for non-root).""" + + def test_is_available_with_mock(self): + """Test checking cgroups v2 availability.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + (root / "cgroup.controllers").write_text("cpu memory io") + + controller = CgroupsV2Controller(root) + self.assertTrue(controller.is_available()) + + def test_is_available_without_file(self): + """Test availability check when file doesn't exist.""" + with tempfile.TemporaryDirectory() as tmpdir: + controller = CgroupsV2Controller(Path(tmpdir)) + self.assertFalse(controller.is_available()) + + def test_get_available_controllers(self): + """Test getting list of available controllers.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + (root / "cgroup.controllers").write_text("cpu memory io pids") + + controller = CgroupsV2Controller(root) + controllers = controller.get_available_controllers() + self.assertEqual(controllers, ["cpu", "memory", "io", "pids"]) + + def test_create_cgroup_mock(self): + """Test creating a cgroup directory.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + (root / "cgroup.controllers").write_text("cpu memory") + + controller = CgroupsV2Controller(root) + success, path = controller.create_cgroup("cortex.slice/test-job") + + self.assertTrue(success) + self.assertTrue((root / "cortex.slice" / "test-job").exists()) + + def test_get_pids_empty(self): + """Test getting PIDs from empty cgroup.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + cgroup = root / "test-cgroup" + cgroup.mkdir() + (cgroup / "cgroup.procs").write_text("") + + controller = CgroupsV2Controller(root) + pids = controller.get_pids("test-cgroup") + self.assertEqual(pids, []) + + def test_get_pids_with_processes(self): + """Test getting PIDs from cgroup with processes.""" + with tempfile.TemporaryDirectory() as tmpdir: + root = Path(tmpdir) + cgroup = root / "test-cgroup" + cgroup.mkdir() + (cgroup / "cgroup.procs").write_text("1234\n5678\n9012") + + controller = CgroupsV2Controller(root) + pids = controller.get_pids("test-cgroup") + self.assertEqual(pids, [1234, 5678, 9012]) + + +class TestOOMScoreManager(unittest.TestCase): + """Test OOM score management.""" + + def test_get_oom_score_current_process(self): + """Test getting OOM score of current process.""" + pid = os.getpid() + score = OOMScoreManager.get_oom_score_adj(pid) + self.assertIsNotNone(score) + self.assertIsInstance(score, int) + self.assertGreaterEqual(score, -1000) + self.assertLessEqual(score, 1000) + + def test_get_oom_score_invalid_pid(self): + """Test getting OOM score of invalid PID.""" + score = OOMScoreManager.get_oom_score_adj(99999999) + self.assertIsNone(score) + + +class TestAcceleratorLimitsManager(unittest.TestCase): + """Test the main manager class.""" + + def setUp(self): + """Create manager with temporary database.""" + self.temp_dir = tempfile.mkdtemp() + self.db_path = Path(self.temp_dir) / ".cortex" / "limits.db" + + # Patch the default database path + self.db_patcher = patch( + 'cortex.kernel_features.accelerator_limits.CORTEX_DB', + self.db_path + ) + self.db_patcher.start() + + self.mgr = AcceleratorLimitsManager() + + def tearDown(self): + """Clean up.""" + self.db_patcher.stop() + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_create_profile(self): + """Test creating a profile.""" + limits = ResourceLimits.from_preset("create-test", "inference", gpus=2) + result = self.mgr.create(limits) + self.assertTrue(result) + + # Verify it was saved + retrieved = self.mgr.db.get("create-test") + self.assertIsNotNone(retrieved) + self.assertEqual(retrieved.name, "create-test") + + def test_get_env_with_gpus(self): + """Test getting environment variables for a profile with GPUs.""" + limits = ResourceLimits.from_preset("env-test", "inference", gpus=2) + self.mgr.create(limits) + + env = self.mgr.get_env("env-test") + self.assertIn("CUDA_VISIBLE_DEVICES", env) + self.assertEqual(env["CUDA_VISIBLE_DEVICES"], "0,1") + self.assertIn("HIP_VISIBLE_DEVICES", env) + self.assertIn("OMP_NUM_THREADS", env) + + def test_get_env_no_gpus(self): + """Test environment variables when no GPUs configured.""" + limits = ResourceLimits.from_preset("no-gpu", "batch", gpus=0) + self.mgr.create(limits) + + env = self.mgr.get_env("no-gpu") + self.assertNotIn("CUDA_VISIBLE_DEVICES", env) + self.assertIn("OMP_NUM_THREADS", env) + + def test_get_env_nonexistent_profile(self): + """Test getting env for nonexistent profile.""" + env = self.mgr.get_env("nonexistent") + self.assertEqual(env, {}) + + def test_delete_profile(self): + """Test deleting a profile.""" + limits = ResourceLimits.from_preset("delete-test", "inference") + self.mgr.create(limits) + + self.assertIsNotNone(self.mgr.db.get("delete-test")) + self.mgr.delete("delete-test") + self.assertIsNone(self.mgr.db.get("delete-test")) + + def test_cpu_threads_calculation(self): + """Test that CPU thread count is calculated correctly.""" + # Training preset has 1600% CPU = 16 cores + limits = ResourceLimits.from_preset("threads-test", "training") + self.mgr.create(limits) + + env = self.mgr.get_env("threads-test") + self.assertEqual(env["OMP_NUM_THREADS"], "16") + self.assertEqual(env["MKL_NUM_THREADS"], "16") + + +class TestPresets(unittest.TestCase): + """Test preset configurations.""" + + def test_all_presets_exist(self): + """Test all expected presets are defined.""" + expected = ["inference", "training", "batch", "interactive"] + for preset in expected: + self.assertIn(preset, PRESETS) + + def test_preset_has_required_fields(self): + """Test each preset has required fields.""" + required = ["cpu_quota", "memory_gb", "oom_score_adj", "gpu_percent"] + for name, config in PRESETS.items(): + for field in required: + self.assertIn(field, config, f"Preset {name} missing {field}") + + def test_workload_preset_enum(self): + """Test WorkloadPreset enum values.""" + self.assertEqual(WorkloadPreset.INFERENCE.value, "inference") + self.assertEqual(WorkloadPreset.TRAINING.value, "training") + self.assertEqual(WorkloadPreset.BATCH.value, "batch") + self.assertEqual(WorkloadPreset.INTERACTIVE.value, "interactive") + + +class TestCLI(unittest.TestCase): + """Test CLI argument parsing.""" + + def test_help_output(self): + """Test that help doesn't crash.""" + from cortex.kernel_features.accelerator_limits import main + import sys + from io import StringIO + + old_argv = sys.argv + old_stdout = sys.stdout + + try: + sys.argv = ['accelerator_limits', '--help'] + sys.stdout = StringIO() + with self.assertRaises(SystemExit) as ctx: + main() + self.assertEqual(ctx.exception.code, 0) + finally: + sys.argv = old_argv + sys.stdout = old_stdout + + +if __name__ == "__main__": + unittest.main(verbosity=2)