From 2163c4469bf93bffae9bfe14beb133a16a05d401 Mon Sep 17 00:00:00 2001 From: Zafeeruddin Date: Fri, 19 Dec 2025 02:17:10 +0530 Subject: [PATCH 1/8] Added roles for few roles suggested --- cortex/roles/__init__.py | 3 +++ cortex/roles/datascience.yaml | 23 +++++++++++++++++++++++ cortex/roles/default.yaml | 14 ++++++++++++++ cortex/roles/devops.yaml | 23 +++++++++++++++++++++++ cortex/roles/security.yaml | 22 ++++++++++++++++++++++ cortex/roles/sysadmin.yaml | 26 ++++++++++++++++++++++++++ 6 files changed, 111 insertions(+) create mode 100644 cortex/roles/__init__.py create mode 100644 cortex/roles/datascience.yaml create mode 100644 cortex/roles/default.yaml create mode 100644 cortex/roles/devops.yaml create mode 100644 cortex/roles/security.yaml create mode 100644 cortex/roles/sysadmin.yaml diff --git a/cortex/roles/__init__.py b/cortex/roles/__init__.py new file mode 100644 index 00000000..123587a2 --- /dev/null +++ b/cortex/roles/__init__.py @@ -0,0 +1,3 @@ +# Built-in roles for Cortex Linux +# Role YAML files are loaded from this directory + diff --git a/cortex/roles/datascience.yaml b/cortex/roles/datascience.yaml new file mode 100644 index 00000000..0157eec1 --- /dev/null +++ b/cortex/roles/datascience.yaml @@ -0,0 +1,23 @@ +name: datascience +description: "Data science role for ML/AI, scientific computing, and data processing" +prompt_additions: | + Data science-focused guidelines: + - Optimize for GPU/CUDA support when installing ML frameworks + - Include cuDNN, CUDA toolkit setup for deep learning packages + - Set up Jupyter notebook/lab integration when relevant + - Configure Python virtual environments to avoid system conflicts + - Include scientific computing libraries (NumPy, SciPy, Pandas) as dependencies + - Set up proper BLAS/LAPACK optimizations (OpenBLAS, MKL) + - Configure large file handling (Git LFS) for datasets + - Include data format support (HDF5, Parquet, Arrow) + - Set up distributed computing frameworks when applicable (Dask, Spark) + - Configure memory-mapped file support for large datasets + - Include visualization libraries (Matplotlib, Plotly) + - Set up model serving infrastructure when relevant (TensorFlow Serving, TorchServe) + - Consider reproducibility (version pinning, environment export) +priorities: + - gpu-acceleration + - performance + - reproducibility + - scalability + diff --git a/cortex/roles/default.yaml b/cortex/roles/default.yaml new file mode 100644 index 00000000..bb73aa91 --- /dev/null +++ b/cortex/roles/default.yaml @@ -0,0 +1,14 @@ +name: default +description: "General-purpose role for standard package installation and system commands" +prompt_additions: | + Standard installation guidelines: + - Follow best practices for Debian/Ubuntu systems + - Use official package repositories when available + - Provide clear, sequential commands + - Include necessary dependencies automatically + - Use non-interactive flags (-y) for automation +priorities: + - reliability + - simplicity + - compatibility + diff --git a/cortex/roles/devops.yaml b/cortex/roles/devops.yaml new file mode 100644 index 00000000..e5b25671 --- /dev/null +++ b/cortex/roles/devops.yaml @@ -0,0 +1,23 @@ +name: devops +description: "DevOps-focused role for CI/CD, containers, and infrastructure automation" +prompt_additions: | + DevOps-focused guidelines: + - Optimize for automation and scriptability + - Include container runtime setup (Docker, Podman) when relevant + - Configure services for container orchestration compatibility (Kubernetes, Docker Swarm) + - Set up proper logging for centralized log aggregation (journald, syslog) + - Include health check endpoints and monitoring hooks + - Configure services for high availability when applicable + - Use infrastructure-as-code friendly configurations + - Include systemd service configurations for proper process management + - Set up appropriate resource limits (cgroups, ulimits) + - Configure for 12-factor app principles where applicable + - Include backup and disaster recovery considerations + - Set up proper environment variable handling + - Consider blue-green deployment compatibility +priorities: + - automation + - scalability + - observability + - reliability + diff --git a/cortex/roles/security.yaml b/cortex/roles/security.yaml new file mode 100644 index 00000000..083d8206 --- /dev/null +++ b/cortex/roles/security.yaml @@ -0,0 +1,22 @@ +name: security +description: "Security-focused role for auditing, hardening, and secure installations" +prompt_additions: | + Security-focused guidelines: + - ALWAYS prefer packages from official repositories only + - Suggest security hardening steps after installation (firewall rules, AppArmor/SELinux profiles) + - Warn about known vulnerabilities or CVEs if applicable + - Enable audit logging and security monitoring when available + - Prefer minimal installations to reduce attack surface + - Recommend firewall rules when installing network services + - Suggest fail2ban or similar intrusion prevention for exposed services + - Check file permissions and ownership for sensitive configurations + - Recommend TLS/SSL configuration for network services + - Include verification steps (checksums, signatures) when possible + - Avoid installing packages with known security issues + - Suggest running services with minimal privileges (non-root users, capabilities) +priorities: + - security + - minimal-attack-surface + - auditability + - compliance + diff --git a/cortex/roles/sysadmin.yaml b/cortex/roles/sysadmin.yaml new file mode 100644 index 00000000..a9814055 --- /dev/null +++ b/cortex/roles/sysadmin.yaml @@ -0,0 +1,26 @@ +name: sysadmin +description: "System administration role for monitoring, maintenance, and system health" +prompt_additions: | + System administration guidelines: + - Include comprehensive logging and log rotation setup + - Configure monitoring and alerting (Prometheus exporters, Nagios plugins) + - Set up automatic security updates (unattended-upgrades) + - Include backup solutions and schedule configuration + - Configure proper systemd service management + - Set up cron jobs for maintenance tasks + - Include disk usage monitoring and cleanup scripts + - Configure network monitoring tools (netstat, ss, iftop) + - Set up user and permission management + - Include SSH hardening and key management + - Configure NTP time synchronization + - Set up proper hostname and DNS resolution + - Include performance monitoring tools (htop, iotop, vmstat) + - Configure swap and memory management + - Set up proper boot and shutdown procedures + - Include disaster recovery procedures +priorities: + - stability + - monitoring + - maintenance + - backup + From 55f9e60f971ea807ce2ed2182ca2f972eca48a30 Mon Sep 17 00:00:00 2001 From: Zafeeruddin Date: Fri, 19 Dec 2025 02:18:03 +0530 Subject: [PATCH 2/8] Added role related methods --- cortex/cli.py | 191 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 187 insertions(+), 4 deletions(-) diff --git a/cortex/cli.py b/cortex/cli.py index 3a8ba5a4..c506874c 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -18,6 +18,10 @@ # Import the new Notification Manager from cortex.notification_manager import NotificationManager + +# Import Role Manager +from cortex.role_manager import RoleManager, RoleError, RoleNotFoundError + from cortex.user_preferences import ( PreferencesManager, format_preference_value, @@ -30,12 +34,13 @@ class CortexCLI: - def __init__(self, verbose: bool = False): + def __init__(self, verbose: bool = False, role: str = "default"): self.spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] self.spinner_idx = 0 self.prefs_manager = None # Lazy initialization self.verbose = verbose self.offline = False + self.role = role def _debug(self, message: str): """Print debug info only in verbose mode""" @@ -192,6 +197,7 @@ def install(self, software: str, execute: bool = False, dry_run: bool = False): provider = self._get_provider() self._debug(f"Using provider: {provider}") self._debug(f"API key: {api_key[:10]}...{api_key[-4:]}") + self._debug(f"Using role: {self.role}") # Initialize installation history history = InstallationHistory() @@ -202,7 +208,7 @@ def install(self, software: str, execute: bool = False, dry_run: bool = False): self._print_status("🧠", "Understanding request...") interpreter = CommandInterpreter( - api_key=api_key, provider=provider, offline=self.offline + api_key=api_key, provider=provider, offline=self.offline, role=self.role ) self._print_status("📦", "Planning installation...") @@ -544,6 +550,131 @@ def demo(self): # (Keep existing demo logic) return 0 + # --- Role Management Methods --- + + def role_list(self): + """List all available roles""" + from rich.table import Table + + manager = RoleManager() + roles = manager.list_roles() + + cx_header("Available Roles") + + table = Table(show_header=True, header_style="bold cyan", box=None) + table.add_column("Role", style="green") + table.add_column("Description") + table.add_column("Type", style="dim") + + for role in roles: + role_type = "built-in" + if role["is_custom_override"]: + role_type = "custom (override)" + elif not role["is_builtin"]: + role_type = "custom" + + table.add_row(role["name"], role["description"], role_type) + + console.print(table) + console.print() + cx_print("Use [bold]cortex --role install [/bold] to use a role", "info") + return 0 + + def role_show(self, name: str): + """Show details of a specific role""" + try: + manager = RoleManager() + role = manager.get_role(name) + + cx_header(f"Role: {role.name}") + console.print(f"[bold]Description:[/bold] {role.description}") + console.print(f"[bold]Type:[/bold] {'Built-in' if role.is_builtin else 'Custom'}") + + if role.priorities: + console.print(f"[bold]Priorities:[/bold] {', '.join(role.priorities)}") + + console.print("\n[bold]Prompt Additions:[/bold]") + console.print(f"[dim]{role.prompt_additions}[/dim]") + + return 0 + except RoleNotFoundError: + self._print_error(f"Role '{name}' not found") + return 1 + + def role_create(self, name: str, description: str = "", from_template: str | None = None): + """Create a new custom role""" + import subprocess + import tempfile + + manager = RoleManager() + + # Check if role already exists + if manager.role_exists(name): + self._print_error(f"Role '{name}' already exists") + return 1 + + # If from_template, copy the template + if from_template: + try: + template_role = manager.get_role(from_template) + role = manager.create_role( + name=name, + description=description or f"Custom role based on {from_template}", + prompt_additions=template_role.prompt_additions, + priorities=template_role.priorities.copy(), + ) + self._print_success(f"Created role '{name}' from template '{from_template}'") + cx_print(f"Edit at: ~/.cortex/roles/{name}.yaml", "info") + return 0 + except RoleNotFoundError: + self._print_error(f"Template role '{from_template}' not found") + return 1 + except RoleError as e: + self._print_error(f"Failed to create role: {e}") + return 1 + + # Create from scratch - use template and open in editor + template = manager.get_role_template().replace("my-custom-role", name) + + if description: + template = template.replace( + '"Brief description of what this role does"', + f'"{description}"' + ) + + # Save template to the roles directory + role_path = manager.custom_roles_dir / f"{name}.yaml" + role_path.parent.mkdir(parents=True, exist_ok=True) + + with open(role_path, "w") as f: + f.write(template) + + self._print_success(f"Created role template at: {role_path}") + cx_print("Edit the file to customize your role's behavior", "info") + + # Try to open in editor + editor = os.environ.get("EDITOR", "nano") + cx_print(f"Opening in {editor}...", "info") + + try: + subprocess.run([editor, str(role_path)]) + except FileNotFoundError: + cx_print(f"Could not open editor. Edit the file manually: {role_path}", "warning") + + return 0 + + def role_delete(self, name: str): + """Delete a custom role""" + manager = RoleManager() + + try: + manager.delete_role(name) + self._print_success(f"Deleted role '{name}'") + return 0 + except RoleError as e: + self._print_error(str(e)) + return 1 + def show_rich_help(): """Display beautifully formatted help using Rich""" @@ -567,12 +698,20 @@ def show_rich_help(): table.add_row("install ", "Install software") table.add_row("history", "View history") table.add_row("rollback ", "Undo installation") - table.add_row("notify", "Manage desktop notifications") # Added this line + table.add_row("role", "Manage roles (list/show/create/delete)") + table.add_row("notify", "Manage desktop notifications") table.add_row("cache stats", "Show LLM cache statistics") table.add_row("doctor", "System health check") console.print(table) console.print() + + # Role examples + console.print("[bold cyan]Role-based Installation:[/bold cyan]") + console.print("[dim] cortex --role security install nginx[/dim]") + console.print("[dim] cortex --role devops install docker[/dim]") + console.print("[dim] cortex role list[/dim]") + console.print() console.print("[dim]Learn more: https://cortexlinux.com/docs[/dim]") @@ -605,6 +744,12 @@ def main(): parser.add_argument( "--offline", action="store_true", help="Use cached responses only (no network calls)" ) + parser.add_argument( + "--role", "-r", + type=str, + default="default", + help="Role for prompt customization (default, security, devops, datascience, sysadmin)" + ) subparsers = parser.add_subparsers(dest="command", help="Available commands") @@ -670,13 +815,34 @@ def main(): cache_subs = cache_parser.add_subparsers(dest="cache_action", help="Cache actions") cache_subs.add_parser("stats", help="Show cache statistics") + # Role commands + role_parser = subparsers.add_parser("role", help="Manage roles for prompt customization") + role_subs = role_parser.add_subparsers(dest="role_action", help="Role actions") + + role_subs.add_parser("list", help="List all available roles") + + role_show_parser = role_subs.add_parser("show", help="Show role details") + role_show_parser.add_argument("name", help="Role name to show") + + role_create_parser = role_subs.add_parser("create", help="Create a new custom role") + role_create_parser.add_argument("name", help="Name for the new role") + role_create_parser.add_argument( + "--description", "-d", default="", help="Short description of the role" + ) + role_create_parser.add_argument( + "--from", dest="from_template", help="Copy from existing role" + ) + + role_delete_parser = role_subs.add_parser("delete", help="Delete a custom role") + role_delete_parser.add_argument("name", help="Role name to delete") + args = parser.parse_args() if not args.command: show_rich_help() return 0 - cli = CortexCLI(verbose=args.verbose) + cli = CortexCLI(verbose=args.verbose, role=args.role) cli.offline = bool(getattr(args, "offline", False)) try: @@ -706,6 +872,23 @@ def main(): return cli.cache_stats() parser.print_help() return 1 + elif args.command == "role": + role_action = getattr(args, "role_action", None) + if role_action == "list": + return cli.role_list() + elif role_action == "show": + return cli.role_show(args.name) + elif role_action == "create": + return cli.role_create( + args.name, + description=args.description, + from_template=getattr(args, "from_template", None), + ) + elif role_action == "delete": + return cli.role_delete(args.name) + else: + # Default to list if no action specified + return cli.role_list() else: parser.print_help() return 1 From 4da2457d7b0e636136f6337d1e6a4389b5f55104 Mon Sep 17 00:00:00 2001 From: Zafeeruddin Date: Fri, 19 Dec 2025 02:18:49 +0530 Subject: [PATCH 3/8] role manager --- cortex/role_manager.py | 375 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 375 insertions(+) create mode 100644 cortex/role_manager.py diff --git a/cortex/role_manager.py b/cortex/role_manager.py new file mode 100644 index 00000000..6e4e5381 --- /dev/null +++ b/cortex/role_manager.py @@ -0,0 +1,375 @@ +""" +Role Manager for Cortex Linux +Manages role-based prompt customization for specialized tasks. + +Roles allow users to switch between different personas (security, devops, etc.) +that modify how the LLM interprets and generates commands. +""" + +import os +import shutil +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import yaml + + +class RoleError(Exception): + """Custom exception for role-related errors""" + + pass + + +class RoleNotFoundError(RoleError): + """Raised when a role cannot be found""" + + pass + + +class RoleValidationError(RoleError): + """Raised when a role fails validation""" + + pass + + +@dataclass +class Role: + """Represents a Cortex role with its configuration""" + + name: str + description: str + prompt_additions: str + priorities: list[str] = field(default_factory=list) + is_builtin: bool = False + + def to_dict(self) -> dict[str, Any]: + """Convert role to dictionary for YAML serialization""" + return { + "name": self.name, + "description": self.description, + "prompt_additions": self.prompt_additions, + "priorities": self.priorities, + } + + @classmethod + def from_dict(cls, data: dict[str, Any], is_builtin: bool = False) -> "Role": + """Create a Role from a dictionary""" + return cls( + name=data.get("name", "unknown"), + description=data.get("description", ""), + prompt_additions=data.get("prompt_additions", ""), + priorities=data.get("priorities", []), + is_builtin=is_builtin, + ) + + +class RoleManager: + """ + Manages role loading, creation, and validation. + + Roles are loaded from two locations: + 1. Built-in roles: Bundled with the cortex package (cortex/roles/) + 2. Custom roles: User-defined roles (~/.cortex/roles/) + + Custom roles take precedence over built-in roles with the same name. + """ + + # Built-in role names + BUILTIN_ROLES = ["default", "security", "devops", "datascience", "sysadmin"] + + def __init__(self): + """Initialize the RoleManager""" + # Built-in roles directory (relative to this file) + self.builtin_roles_dir = Path(__file__).parent / "roles" + + # Custom roles directory + self.custom_roles_dir = Path.home() / ".cortex" / "roles" + self.custom_roles_dir.mkdir(parents=True, exist_ok=True) + + # Cache for loaded roles + self._roles_cache: dict[str, Role] = {} + + def _load_role_from_file(self, filepath: Path, is_builtin: bool = False) -> Role: + """ + Load a role from a YAML file. + + Args: + filepath: Path to the YAML file + is_builtin: Whether this is a built-in role + + Returns: + Role object + + Raises: + RoleValidationError: If the role file is invalid + """ + try: + with open(filepath) as f: + data = yaml.safe_load(f) + + if not data: + raise RoleValidationError(f"Empty role file: {filepath}") + + role = Role.from_dict(data, is_builtin=is_builtin) + self._validate_role(role) + return role + + except yaml.YAMLError as e: + raise RoleValidationError(f"Invalid YAML in role file {filepath}: {e}") + except OSError as e: + raise RoleNotFoundError(f"Cannot read role file {filepath}: {e}") + + def _validate_role(self, role: Role) -> None: + """ + Validate a role's structure and content. + + Args: + role: Role to validate + + Raises: + RoleValidationError: If validation fails + """ + if not role.name: + raise RoleValidationError("Role must have a name") + + if not role.name.isidentifier() and not role.name.replace("-", "_").isidentifier(): + raise RoleValidationError( + f"Invalid role name '{role.name}'. Use alphanumeric characters and hyphens only." + ) + + if not role.description: + raise RoleValidationError("Role must have a description") + + if not role.prompt_additions: + raise RoleValidationError("Role must have prompt_additions") + + def get_role(self, name: str) -> Role: + """ + Get a role by name. + + Custom roles take precedence over built-in roles. + + Args: + name: Role name + + Returns: + Role object + + Raises: + RoleNotFoundError: If role doesn't exist + """ + # Check cache first + if name in self._roles_cache: + return self._roles_cache[name] + + # Try custom role first + custom_path = self.custom_roles_dir / f"{name}.yaml" + if custom_path.exists(): + role = self._load_role_from_file(custom_path, is_builtin=False) + self._roles_cache[name] = role + return role + + # Try built-in role + builtin_path = self.builtin_roles_dir / f"{name}.yaml" + if builtin_path.exists(): + role = self._load_role_from_file(builtin_path, is_builtin=True) + self._roles_cache[name] = role + return role + + raise RoleNotFoundError(f"Role '{name}' not found") + + def list_roles(self) -> list[dict[str, Any]]: + """ + List all available roles (built-in and custom). + + Returns: + List of role info dictionaries with keys: name, description, is_builtin, is_custom_override + """ + roles = [] + seen_names = set() + + # List custom roles first (they take precedence) + if self.custom_roles_dir.exists(): + for filepath in sorted(self.custom_roles_dir.glob("*.yaml")): + try: + role = self._load_role_from_file(filepath, is_builtin=False) + is_override = role.name in self.BUILTIN_ROLES + roles.append( + { + "name": role.name, + "description": role.description, + "is_builtin": False, + "is_custom_override": is_override, + "priorities": role.priorities, + } + ) + seen_names.add(role.name) + except RoleError: + # Skip invalid role files + continue + + # List built-in roles (skip if overridden by custom) + if self.builtin_roles_dir.exists(): + for filepath in sorted(self.builtin_roles_dir.glob("*.yaml")): + try: + role = self._load_role_from_file(filepath, is_builtin=True) + if role.name not in seen_names: + roles.append( + { + "name": role.name, + "description": role.description, + "is_builtin": True, + "is_custom_override": False, + "priorities": role.priorities, + } + ) + seen_names.add(role.name) + except RoleError: + continue + + return roles + + def create_role( + self, + name: str, + description: str, + prompt_additions: str, + priorities: list[str] | None = None, + from_template: str | None = None, + ) -> Role: + """ + Create a new custom role. + + Args: + name: Role name (alphanumeric and hyphens) + description: Short description of the role + prompt_additions: Additional prompt text for the LLM + priorities: Optional list of priorities + from_template: Optional existing role name to copy from + + Returns: + Created Role object + + Raises: + RoleError: If creation fails + """ + # If copying from template, load it first + if from_template: + template_role = self.get_role(from_template) + if not description: + description = f"Custom role based on {from_template}" + if not prompt_additions: + prompt_additions = template_role.prompt_additions + if not priorities: + priorities = template_role.priorities.copy() + + role = Role( + name=name, + description=description, + prompt_additions=prompt_additions, + priorities=priorities or [], + is_builtin=False, + ) + + # Validate + self._validate_role(role) + + # Save to file + filepath = self.custom_roles_dir / f"{name}.yaml" + self._save_role(role, filepath) + + # Clear cache for this role + self._roles_cache.pop(name, None) + + return role + + def _save_role(self, role: Role, filepath: Path) -> None: + """ + Save a role to a YAML file. + + Args: + role: Role to save + filepath: Destination path + """ + # Ensure directory exists + filepath.parent.mkdir(parents=True, exist_ok=True) + + # Write atomically + temp_path = filepath.with_suffix(".yaml.tmp") + try: + with open(temp_path, "w") as f: + yaml.dump(role.to_dict(), f, default_flow_style=False, sort_keys=False) + temp_path.replace(filepath) + except Exception as e: + if temp_path.exists(): + temp_path.unlink() + raise RoleError(f"Failed to save role: {e}") + + def delete_role(self, name: str) -> bool: + """ + Delete a custom role. + + Args: + name: Role name to delete + + Returns: + True if deleted + + Raises: + RoleError: If deletion fails or trying to delete built-in role + """ + # Check if it's a built-in role without custom override + builtin_path = self.builtin_roles_dir / f"{name}.yaml" + custom_path = self.custom_roles_dir / f"{name}.yaml" + + if builtin_path.exists() and not custom_path.exists(): + raise RoleError(f"Cannot delete built-in role '{name}'") + + if not custom_path.exists(): + raise RoleNotFoundError(f"Custom role '{name}' not found") + + try: + custom_path.unlink() + self._roles_cache.pop(name, None) + return True + except OSError as e: + raise RoleError(f"Failed to delete role '{name}': {e}") + + def get_role_template(self) -> str: + """ + Get a template for creating new roles. + + Returns: + YAML template string + """ + return '''name: my-custom-role +description: "Brief description of what this role does" +prompt_additions: | + Additional guidelines for this role: + - Specific instruction 1 + - Specific instruction 2 + - Focus areas and priorities +priorities: + - priority1 + - priority2 +''' + + def role_exists(self, name: str) -> bool: + """ + Check if a role exists. + + Args: + name: Role name + + Returns: + True if role exists (built-in or custom) + """ + custom_path = self.custom_roles_dir / f"{name}.yaml" + builtin_path = self.builtin_roles_dir / f"{name}.yaml" + return custom_path.exists() or builtin_path.exists() + + def clear_cache(self) -> None: + """Clear the roles cache""" + self._roles_cache.clear() + From 6966724964c58a9055c6e77ae810878783f15fd3 Mon Sep 17 00:00:00 2001 From: Zafeeruddin Date: Fri, 19 Dec 2025 02:20:03 +0530 Subject: [PATCH 4/8] tested on ollama --- cortex/llm/interpreter.py | 58 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/cortex/llm/interpreter.py b/cortex/llm/interpreter.py index 03f6614d..ca280565 100644 --- a/cortex/llm/interpreter.py +++ b/cortex/llm/interpreter.py @@ -18,7 +18,8 @@ class CommandInterpreter: """Interprets natural language commands into executable shell commands using LLM APIs. Supports multiple providers (OpenAI, Claude, Ollama) with optional semantic caching - and offline mode for cached responses. + and offline mode for cached responses. Supports role-based prompt customization + for specialized tasks (security, devops, datascience, sysadmin). """ def __init__( @@ -28,6 +29,7 @@ def __init__( model: str | None = None, offline: bool = False, cache: Optional["SemanticCache"] = None, + role: str = "default", ): """Initialize the command interpreter. @@ -37,10 +39,16 @@ def __init__( model: Optional model name override offline: If True, only use cached responses cache: Optional SemanticCache instance for response caching + role: Role name for prompt customization (default, security, devops, etc.) """ self.api_key = api_key self.provider = APIProvider(provider.lower()) self.offline = offline + self.role_name = role + + # Load role configuration + self._role = None + self._load_role(role) if cache is None: try: @@ -65,6 +73,30 @@ def __init__( self._initialize_client() + def _load_role(self, role_name: str) -> None: + """ + Load a role by name. + + Args: + role_name: Name of the role to load + + Raises: + ValueError: If role cannot be loaded + """ + try: + from cortex.role_manager import RoleManager, RoleNotFoundError + + manager = RoleManager() + self._role = manager.get_role(role_name) + except RoleNotFoundError: + raise ValueError(f"Role '{role_name}' not found. Use 'cortex role list' to see available roles.") + except Exception as e: + # Fallback to no role customization if role system fails + self._role = None + # Only warn, don't fail - the base prompt still works + import sys + print(f"Warning: Could not load role '{role_name}': {e}", file=sys.stderr) + def _initialize_client(self): if self.provider == APIProvider.OPENAI: try: @@ -86,7 +118,13 @@ def _initialize_client(self): self.client = None # Will use requests def _get_system_prompt(self) -> str: - return """You are a Linux system command expert. Convert natural language requests into safe, validated bash commands. + """ + Build the system prompt, incorporating role-specific additions if available. + + Returns: + Complete system prompt string + """ + base_prompt = """You are a Linux system command expert. Convert natural language requests into safe, validated bash commands. Rules: 1. Return ONLY a JSON array of commands @@ -103,6 +141,22 @@ def _get_system_prompt(self) -> str: Example request: "install docker with nvidia support" Example response: {"commands": ["sudo apt update", "sudo apt install -y docker.io", "sudo apt install -y nvidia-docker2", "sudo systemctl restart docker"]}""" + # Add role-specific prompt additions if available + if getattr(self, "_role", None) and self._role.prompt_additions: + role_section = f""" + +=== Role: {self._role.name} === +{self._role.description} + +{self._role.prompt_additions}""" + + if self._role.priorities: + role_section += f"\nPriorities: {', '.join(self._role.priorities)}" + + return base_prompt + role_section + + return base_prompt + def _call_openai(self, user_input: str) -> list[str]: try: response = self.client.chat.completions.create( From 58adc317f2f2862f1c698b288cac6ac45d2b658f Mon Sep 17 00:00:00 2001 From: Zafeeruddin Date: Fri, 19 Dec 2025 02:20:25 +0530 Subject: [PATCH 5/8] test case for role manager --- tests/test_role_manager.py | 424 +++++++++++++++++++++++++++++++++++++ 1 file changed, 424 insertions(+) create mode 100644 tests/test_role_manager.py diff --git a/tests/test_role_manager.py b/tests/test_role_manager.py new file mode 100644 index 00000000..1432b63b --- /dev/null +++ b/tests/test_role_manager.py @@ -0,0 +1,424 @@ +""" +Tests for the Role Manager module. +""" + +import tempfile +from pathlib import Path +from unittest import mock + +import pytest +import yaml + +from cortex.role_manager import ( + Role, + RoleError, + RoleManager, + RoleNotFoundError, + RoleValidationError, +) + + +class TestRole: + """Tests for the Role dataclass""" + + def test_role_creation(self): + """Test creating a Role instance""" + role = Role( + name="test-role", + description="A test role", + prompt_additions="Test prompt additions", + priorities=["security", "performance"], + ) + assert role.name == "test-role" + assert role.description == "A test role" + assert role.prompt_additions == "Test prompt additions" + assert role.priorities == ["security", "performance"] + assert role.is_builtin is False + + def test_role_to_dict(self): + """Test converting Role to dictionary""" + role = Role( + name="test-role", + description="A test role", + prompt_additions="Test prompt additions", + priorities=["security"], + ) + data = role.to_dict() + assert data["name"] == "test-role" + assert data["description"] == "A test role" + assert data["prompt_additions"] == "Test prompt additions" + assert data["priorities"] == ["security"] + # is_builtin should not be in the serialized dict + assert "is_builtin" not in data + + def test_role_from_dict(self): + """Test creating Role from dictionary""" + data = { + "name": "test-role", + "description": "A test role", + "prompt_additions": "Test prompt additions", + "priorities": ["reliability"], + } + role = Role.from_dict(data, is_builtin=True) + assert role.name == "test-role" + assert role.is_builtin is True + + def test_role_from_dict_defaults(self): + """Test Role.from_dict with missing optional fields""" + data = {"name": "minimal-role"} + role = Role.from_dict(data) + assert role.name == "minimal-role" + assert role.description == "" + assert role.prompt_additions == "" + assert role.priorities == [] + + +class TestRoleManager: + """Tests for the RoleManager class""" + + @pytest.fixture + def temp_dirs(self): + """Create temporary directories for testing""" + with tempfile.TemporaryDirectory() as tmpdir: + custom_dir = Path(tmpdir) / "custom_roles" + custom_dir.mkdir() + builtin_dir = Path(tmpdir) / "builtin_roles" + builtin_dir.mkdir() + yield custom_dir, builtin_dir + + @pytest.fixture + def manager_with_temp_dirs(self, temp_dirs): + """Create a RoleManager with temporary directories""" + custom_dir, builtin_dir = temp_dirs + manager = RoleManager() + manager.custom_roles_dir = custom_dir + manager.builtin_roles_dir = builtin_dir + return manager + + def test_manager_initialization(self): + """Test RoleManager initialization""" + manager = RoleManager() + assert manager.builtin_roles_dir.exists() or not manager.builtin_roles_dir.exists() + # Custom roles dir should be created + assert manager.custom_roles_dir.parent.exists() + + def test_validate_role_valid(self, manager_with_temp_dirs): + """Test validating a valid role""" + manager = manager_with_temp_dirs + role = Role( + name="valid-role", + description="A valid role", + prompt_additions="Some prompt additions", + ) + # Should not raise + manager._validate_role(role) + + def test_validate_role_missing_name(self, manager_with_temp_dirs): + """Test validation fails for role without name""" + manager = manager_with_temp_dirs + role = Role(name="", description="desc", prompt_additions="prompt") + with pytest.raises(RoleValidationError, match="must have a name"): + manager._validate_role(role) + + def test_validate_role_missing_description(self, manager_with_temp_dirs): + """Test validation fails for role without description""" + manager = manager_with_temp_dirs + role = Role(name="test", description="", prompt_additions="prompt") + with pytest.raises(RoleValidationError, match="must have a description"): + manager._validate_role(role) + + def test_validate_role_missing_prompt(self, manager_with_temp_dirs): + """Test validation fails for role without prompt_additions""" + manager = manager_with_temp_dirs + role = Role(name="test", description="desc", prompt_additions="") + with pytest.raises(RoleValidationError, match="must have prompt_additions"): + manager._validate_role(role) + + def test_validate_role_invalid_name(self, manager_with_temp_dirs): + """Test validation fails for invalid role name""" + manager = manager_with_temp_dirs + role = Role(name="invalid name!", description="desc", prompt_additions="prompt") + with pytest.raises(RoleValidationError, match="Invalid role name"): + manager._validate_role(role) + + def test_load_role_from_file(self, temp_dirs): + """Test loading a role from a YAML file""" + custom_dir, builtin_dir = temp_dirs + manager = RoleManager() + manager.custom_roles_dir = custom_dir + manager.builtin_roles_dir = builtin_dir + + # Create a test role file + role_data = { + "name": "test-role", + "description": "Test description", + "prompt_additions": "Test prompt", + "priorities": ["test"], + } + role_file = builtin_dir / "test-role.yaml" + with open(role_file, "w") as f: + yaml.dump(role_data, f) + + role = manager._load_role_from_file(role_file, is_builtin=True) + assert role.name == "test-role" + assert role.is_builtin is True + + def test_load_role_invalid_yaml(self, temp_dirs): + """Test loading a role with invalid YAML""" + custom_dir, builtin_dir = temp_dirs + manager = RoleManager() + manager.custom_roles_dir = custom_dir + manager.builtin_roles_dir = builtin_dir + + # Create an invalid YAML file + role_file = builtin_dir / "invalid.yaml" + with open(role_file, "w") as f: + f.write("invalid: yaml: content: [") + + with pytest.raises(RoleValidationError, match="Invalid YAML"): + manager._load_role_from_file(role_file) + + def test_get_role_custom_takes_precedence(self, temp_dirs): + """Test that custom roles take precedence over built-in""" + custom_dir, builtin_dir = temp_dirs + manager = RoleManager() + manager.custom_roles_dir = custom_dir + manager.builtin_roles_dir = builtin_dir + + # Create both builtin and custom role with same name + builtin_data = { + "name": "test-role", + "description": "Builtin version", + "prompt_additions": "Builtin prompt", + } + custom_data = { + "name": "test-role", + "description": "Custom version", + "prompt_additions": "Custom prompt", + } + + with open(builtin_dir / "test-role.yaml", "w") as f: + yaml.dump(builtin_data, f) + with open(custom_dir / "test-role.yaml", "w") as f: + yaml.dump(custom_data, f) + + role = manager.get_role("test-role") + assert role.description == "Custom version" + assert role.is_builtin is False + + def test_get_role_not_found(self, manager_with_temp_dirs): + """Test getting a non-existent role raises error""" + manager = manager_with_temp_dirs + with pytest.raises(RoleNotFoundError, match="not found"): + manager.get_role("nonexistent-role") + + def test_list_roles(self, temp_dirs): + """Test listing all roles""" + custom_dir, builtin_dir = temp_dirs + manager = RoleManager() + manager.custom_roles_dir = custom_dir + manager.builtin_roles_dir = builtin_dir + + # Create some roles + role1 = { + "name": "role1", + "description": "Role 1", + "prompt_additions": "Prompt 1", + } + role2 = { + "name": "role2", + "description": "Role 2", + "prompt_additions": "Prompt 2", + } + + with open(builtin_dir / "role1.yaml", "w") as f: + yaml.dump(role1, f) + with open(custom_dir / "role2.yaml", "w") as f: + yaml.dump(role2, f) + + roles = manager.list_roles() + names = [r["name"] for r in roles] + assert "role1" in names + assert "role2" in names + + def test_create_role(self, manager_with_temp_dirs): + """Test creating a new custom role""" + manager = manager_with_temp_dirs + role = manager.create_role( + name="new-role", + description="A new role", + prompt_additions="New prompt additions", + priorities=["test"], + ) + assert role.name == "new-role" + assert (manager.custom_roles_dir / "new-role.yaml").exists() + + def test_create_role_from_template(self, temp_dirs): + """Test creating a role from an existing template""" + custom_dir, builtin_dir = temp_dirs + manager = RoleManager() + manager.custom_roles_dir = custom_dir + manager.builtin_roles_dir = builtin_dir + + # Create template role + template_data = { + "name": "template", + "description": "Template role", + "prompt_additions": "Template prompt", + "priorities": ["priority1"], + } + with open(builtin_dir / "template.yaml", "w") as f: + yaml.dump(template_data, f) + + # Create new role from template + role = manager.create_role( + name="from-template", + description="", + prompt_additions="", + from_template="template", + ) + assert role.prompt_additions == "Template prompt" + assert role.priorities == ["priority1"] + + def test_delete_role(self, manager_with_temp_dirs): + """Test deleting a custom role""" + manager = manager_with_temp_dirs + + # Create a custom role first + manager.create_role( + name="to-delete", + description="Will be deleted", + prompt_additions="Delete me", + ) + assert (manager.custom_roles_dir / "to-delete.yaml").exists() + + # Delete it + result = manager.delete_role("to-delete") + assert result is True + assert not (manager.custom_roles_dir / "to-delete.yaml").exists() + + def test_delete_builtin_role_fails(self, temp_dirs): + """Test that deleting a built-in role fails""" + custom_dir, builtin_dir = temp_dirs + manager = RoleManager() + manager.custom_roles_dir = custom_dir + manager.builtin_roles_dir = builtin_dir + + # Create a builtin role + builtin_data = { + "name": "builtin", + "description": "Builtin role", + "prompt_additions": "Builtin prompt", + } + with open(builtin_dir / "builtin.yaml", "w") as f: + yaml.dump(builtin_data, f) + + with pytest.raises(RoleError, match="Cannot delete built-in role"): + manager.delete_role("builtin") + + def test_role_exists(self, temp_dirs): + """Test checking if a role exists""" + custom_dir, builtin_dir = temp_dirs + manager = RoleManager() + manager.custom_roles_dir = custom_dir + manager.builtin_roles_dir = builtin_dir + + # Create a role + role_data = { + "name": "existing", + "description": "Exists", + "prompt_additions": "Prompt", + } + with open(builtin_dir / "existing.yaml", "w") as f: + yaml.dump(role_data, f) + + assert manager.role_exists("existing") is True + assert manager.role_exists("nonexistent") is False + + def test_get_role_template(self, manager_with_temp_dirs): + """Test getting a role template""" + manager = manager_with_temp_dirs + template = manager.get_role_template() + assert "name:" in template + assert "description:" in template + assert "prompt_additions:" in template + assert "priorities:" in template + + def test_caching(self, temp_dirs): + """Test that roles are cached after first load""" + custom_dir, builtin_dir = temp_dirs + manager = RoleManager() + manager.custom_roles_dir = custom_dir + manager.builtin_roles_dir = builtin_dir + + role_data = { + "name": "cached", + "description": "Cached role", + "prompt_additions": "Cached prompt", + } + with open(builtin_dir / "cached.yaml", "w") as f: + yaml.dump(role_data, f) + + # First load + role1 = manager.get_role("cached") + # Second load should be from cache + role2 = manager.get_role("cached") + assert role1 is role2 # Same object from cache + + def test_clear_cache(self, temp_dirs): + """Test clearing the cache""" + custom_dir, builtin_dir = temp_dirs + manager = RoleManager() + manager.custom_roles_dir = custom_dir + manager.builtin_roles_dir = builtin_dir + + role_data = { + "name": "cached", + "description": "Cached role", + "prompt_additions": "Cached prompt", + } + with open(builtin_dir / "cached.yaml", "w") as f: + yaml.dump(role_data, f) + + role1 = manager.get_role("cached") + manager.clear_cache() + role2 = manager.get_role("cached") + assert role1 is not role2 # Different objects after cache clear + + +class TestBuiltinRoles: + """Tests for the built-in roles""" + + def test_builtin_roles_exist(self): + """Test that all built-in roles can be loaded""" + manager = RoleManager() + for role_name in RoleManager.BUILTIN_ROLES: + try: + role = manager.get_role(role_name) + assert role.name == role_name + assert role.description + assert role.prompt_additions + except RoleNotFoundError: + # Built-in roles might not exist in test environment + # if running from a different directory + pass + + def test_default_role_priorities(self): + """Test that default role has expected priorities""" + manager = RoleManager() + try: + role = manager.get_role("default") + assert "reliability" in role.priorities or len(role.priorities) >= 0 + except RoleNotFoundError: + pass # Skip if not in correct directory + + def test_security_role_content(self): + """Test that security role has security-focused content""" + manager = RoleManager() + try: + role = manager.get_role("security") + assert "security" in role.description.lower() + assert "security" in role.prompt_additions.lower() + except RoleNotFoundError: + pass + From 1d28da772b20e775a25204ebd9241ebf64641fab Mon Sep 17 00:00:00 2001 From: Zafeeruddin Date: Fri, 19 Dec 2025 02:33:46 +0530 Subject: [PATCH 6/8] fix: update test assertion from >= 0 to > 0 for collection length check --- tests/test_role_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_role_manager.py b/tests/test_role_manager.py index 1432b63b..4873d79f 100644 --- a/tests/test_role_manager.py +++ b/tests/test_role_manager.py @@ -408,7 +408,7 @@ def test_default_role_priorities(self): manager = RoleManager() try: role = manager.get_role("default") - assert "reliability" in role.priorities or len(role.priorities) >= 0 + assert "reliability" in role.priorities or len(role.priorities) > 0 except RoleNotFoundError: pass # Skip if not in correct directory From c1e1d6f5297eb718302c22333e3f6566799d9112 Mon Sep 17 00:00:00 2001 From: Zafeeruddin Date: Fri, 19 Dec 2025 02:46:10 +0530 Subject: [PATCH 7/8] fix: address SonarQube code quality issues in role manager and tests --- cortex/llm/interpreter.py | 7 ++++++- tests/test_role_manager.py | 18 +++++++++--------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/cortex/llm/interpreter.py b/cortex/llm/interpreter.py index ca280565..e3119c0a 100644 --- a/cortex/llm/interpreter.py +++ b/cortex/llm/interpreter.py @@ -81,7 +81,12 @@ def _load_role(self, role_name: str) -> None: role_name: Name of the role to load Raises: - ValueError: If role cannot be loaded + ValueError: If the specified role is not found. + + Note: + For other exceptions (e.g., file read errors), a warning is logged + to stderr and self._role is set to None, allowing the interpreter + to continue with the default base prompt. """ try: from cortex.role_manager import RoleManager, RoleNotFoundError diff --git a/tests/test_role_manager.py b/tests/test_role_manager.py index 4873d79f..cfad6ebe 100644 --- a/tests/test_role_manager.py +++ b/tests/test_role_manager.py @@ -98,9 +98,11 @@ def manager_with_temp_dirs(self, temp_dirs): def test_manager_initialization(self): """Test RoleManager initialization""" manager = RoleManager() - assert manager.builtin_roles_dir.exists() or not manager.builtin_roles_dir.exists() - # Custom roles dir should be created - assert manager.custom_roles_dir.parent.exists() + # Verify builtin_roles_dir is a Path object + assert isinstance(manager.builtin_roles_dir, Path) + # Custom roles dir should be created and exist + assert isinstance(manager.custom_roles_dir, Path) + assert manager.custom_roles_dir.exists() def test_validate_role_valid(self, manager_with_temp_dirs): """Test validating a valid role""" @@ -399,18 +401,16 @@ def test_builtin_roles_exist(self): assert role.description assert role.prompt_additions except RoleNotFoundError: - # Built-in roles might not exist in test environment - # if running from a different directory - pass + pytest.skip(f"Built-in role '{role_name}' not found (running from different directory)") def test_default_role_priorities(self): """Test that default role has expected priorities""" manager = RoleManager() try: role = manager.get_role("default") - assert "reliability" in role.priorities or len(role.priorities) > 0 + assert "reliability" in role.priorities except RoleNotFoundError: - pass # Skip if not in correct directory + pytest.skip("Default role not found (running from different directory)") def test_security_role_content(self): """Test that security role has security-focused content""" @@ -420,5 +420,5 @@ def test_security_role_content(self): assert "security" in role.description.lower() assert "security" in role.prompt_additions.lower() except RoleNotFoundError: - pass + pytest.skip("Security role not found (running from different directory)") From 96620f9df578a6a7727d6c7c077693b212cf4ab9 Mon Sep 17 00:00:00 2001 From: Zafeeruddin Date: Fri, 19 Dec 2025 02:49:22 +0530 Subject: [PATCH 8/8] fix: remove unused local variable in role_create method --- cortex/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cortex/cli.py b/cortex/cli.py index c506874c..98acbf5b 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -617,7 +617,7 @@ def role_create(self, name: str, description: str = "", from_template: str | Non if from_template: try: template_role = manager.get_role(from_template) - role = manager.create_role( + manager.create_role( name=name, description=description or f"Custom role based on {from_template}", prompt_additions=template_role.prompt_additions,