diff --git a/cortex/cli.py b/cortex/cli.py index ea8976d1..6ffe57df 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -2,12 +2,11 @@ import logging import os import sys -import time from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Any -from cortex.api_key_detector import auto_detect_api_key, setup_api_key +from cortex.api_key_detector import setup_api_key from cortex.ask import AskHandler from cortex.branding import VERSION, console, cx_header, cx_print, show_banner from cortex.coordinator import InstallationCoordinator, InstallationStep, StepStatus @@ -16,7 +15,6 @@ DependencyImporter, PackageEcosystem, ParseResult, - format_package_list, ) from cortex.env_manager import EnvironmentManager, get_env_manager from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType @@ -24,7 +22,19 @@ from cortex.network_config import NetworkConfig from cortex.notification_manager import NotificationManager from cortex.stack_manager import StackManager -from cortex.validators import validate_api_key, validate_install_request +from cortex.ui import ( + data_table, + error, + info, + progress_bar, + section, + spinner, + status_box, + success, + summary_box, + warning, +) +from cortex.validators import validate_install_request if TYPE_CHECKING: from cortex.shell_env_analyzer import ShellEnvironmentAnalyzer @@ -38,9 +48,8 @@ class CortexCLI: def __init__(self, verbose: bool = False): - self.spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] - self.spinner_idx = 0 self.verbose = verbose + self._detected_provider: str | None = None # Define a method to handle Docker-specific permission repairs def docker_permissions(self, args: argparse.Namespace) -> int: @@ -123,7 +132,7 @@ def docker_permissions(self, args: argparse.Namespace) -> int: def _debug(self, message: str): """Print debug info only in verbose mode""" if self.verbose: - console.print(f"[dim][DEBUG] {message}[/dim]") + info(f"[dim]DEBUG[/dim] {message}", badge=True) def _get_api_key(self) -> str | None: # 1. Check explicit provider override first (fake/ollama need no key) @@ -135,24 +144,32 @@ def _get_api_key(self) -> str | None: self._debug("Using Ollama (no API key required)") return "ollama-local" - # 2. Try auto-detection + prompt to save (setup_api_key handles both) - success, key, detected_provider = setup_api_key() - if success: - self._debug(f"Using {detected_provider} API key") - # Store detected provider so _get_provider can use it + with spinner("Detecting API provider and credentials"): + success_flag, key, detected_provider = setup_api_key() + + if success_flag: self._detected_provider = detected_provider + success("API key loaded", f"Provider: {detected_provider}") return key # Still no key - self._print_error("No API key found or provided") - cx_print("Run [bold]cortex wizard[/bold] to configure your API key.", "info") - cx_print("Or use [bold]CORTEX_PROVIDER=ollama[/bold] for offline mode.", "info") + error("No API key found or provided") + info("Run [bold]cortex wizard[/bold] to configure your API key.", badge=True) + info("Or use [bold]CORTEX_PROVIDER=ollama[/bold] for offline mode.", badge=True) return None + def _animate_spinner(self, *_): + """No-op animation placeholder (kept for backward compatibility).""" + pass + + def _clear_line(self): + """No-op; kept for compatibility with earlier console-clearing helpers.""" + pass + def _get_provider(self) -> str: # Check environment variable for explicit provider choice explicit_provider = os.environ.get("CORTEX_PROVIDER", "").lower() - if explicit_provider in ["ollama", "openai", "claude", "fake"]: + if explicit_provider in {"ollama", "openai", "claude", "fake"}: return explicit_provider # Use provider from auto-detection (set by _get_api_key) @@ -172,54 +189,35 @@ def _get_provider(self) -> str: return "ollama" def _print_status(self, emoji: str, message: str): - """Legacy status print - maps to cx_print for Rich output""" - status_map = { - "🧠": "thinking", - "📦": "info", - "⚙️": "info", - "🔍": "info", - } - status = status_map.get(emoji, "info") - cx_print(message, status) + """Legacy status mapper. Internally routes to ui.py for consistent formatting.""" + info(message, badge=True) def _print_error(self, message: str): - cx_print(f"Error: {message}", "error") + error(message) def _print_success(self, message: str): - cx_print(message, "success") + success(message) - def _animate_spinner(self, message: str): - sys.stdout.write(f"\r{self.spinner_chars[self.spinner_idx]} {message}") - sys.stdout.flush() - self.spinner_idx = (self.spinner_idx + 1) % len(self.spinner_chars) - time.sleep(0.1) - - def _clear_line(self): - sys.stdout.write("\r\033[K") - sys.stdout.flush() - - # --- New Notification Method --- def notify(self, args): """Handle notification commands""" # Addressing CodeRabbit feedback: Handle missing subcommand gracefully if not args.notify_action: - self._print_error("Please specify a subcommand (config/enable/disable/dnd/send)") + error("Please specify a subcommand (config/enable/disable/dnd/send)") return 1 mgr = NotificationManager() + section("NOTIFICATIONS") if args.notify_action == "config": - console.print("[bold cyan]🔧 Current Notification Configuration:[/bold cyan]") - status = ( - "[green]Enabled[/green]" - if mgr.config.get("enabled", True) - else "[red]Disabled[/red]" - ) - console.print(f"Status: {status}") - console.print( - f"DND Window: [yellow]{mgr.config['dnd_start']} - {mgr.config['dnd_end']}[/yellow]" + status_box( + "NOTIFICATION SETTINGS", + { + "Status": "Enabled" if mgr.config.get("enabled", True) else "Disabled", + "Do Not Disturb": f"{mgr.config.get('dnd_start','—')} → {mgr.config.get('dnd_end','—')}", + "History File": mgr.history_file, + }, + border_color="green" if mgr.config.get("enabled", True) else "red", ) - console.print(f"History File: {mgr.history_file}") return 0 elif args.notify_action == "enable": @@ -227,18 +225,18 @@ def notify(self, args): # Addressing CodeRabbit feedback: Ideally should use a public method instead of private _save_config, # but keeping as is for a simple fix (or adding a save method to NotificationManager would be best). mgr._save_config() - self._print_success("Notifications enabled") + success("Notifications enabled") return 0 elif args.notify_action == "disable": mgr.config["enabled"] = False mgr._save_config() - cx_print("Notifications disabled (Critical alerts will still show)", "warning") + warning("Notifications disabled (critical alerts will still show)") return 0 elif args.notify_action == "dnd": if not args.start or not args.end: - self._print_error("Please provide start and end times (HH:MM)") + error("Please provide start and end times (HH:MM)") return 1 # Addressing CodeRabbit feedback: Add time format validation @@ -246,25 +244,33 @@ def notify(self, args): datetime.strptime(args.start, "%H:%M") datetime.strptime(args.end, "%H:%M") except ValueError: - self._print_error("Invalid time format. Use HH:MM (e.g., 22:00)") + error("Invalid time format. Use HH:MM (e.g., 22:00)") return 1 mgr.config["dnd_start"] = args.start mgr.config["dnd_end"] = args.end mgr._save_config() - self._print_success(f"DND Window updated: {args.start} - {args.end}") + success(f"DND window updated: {args.start} → {args.end}") return 0 elif args.notify_action == "send": if not args.message: - self._print_error("Message required") + error("Message is required") return 1 - console.print("[dim]Sending notification...[/dim]") - mgr.send(args.title, args.message, level=args.level, actions=args.actions) + + with spinner("Sending notification"): + mgr.send( + args.title, + args.message, + level=args.level, + actions=args.actions, + ) + + success("Notification sent") return 0 else: - self._print_error("Unknown notify command") + error("Unknown notify command") return 1 # ------------------------------- @@ -275,28 +281,8 @@ def demo(self): return run_demo() def stack(self, args: argparse.Namespace) -> int: - """Handle `cortex stack` commands (list/describe/install/dry-run).""" try: manager = StackManager() - - # Validate --dry-run requires a stack name - if args.dry_run and not args.name: - self._print_error( - "--dry-run requires a stack name (e.g., `cortex stack ml --dry-run`)" - ) - return 1 - - # List stacks (default when no name/describe) - if args.list or (not args.name and not args.describe): - return self._handle_stack_list(manager) - - # Describe a specific stack - if args.describe: - return self._handle_stack_describe(manager, args.describe) - - # Install a stack (only remaining path) - return self._handle_stack_install(manager, args) - except FileNotFoundError as e: self._print_error(f"stacks.json not found. Ensure cortex/stacks.json exists: {e}") return 1 @@ -304,18 +290,17 @@ def stack(self, args: argparse.Namespace) -> int: self._print_error(f"stacks.json is invalid or malformed: {e}") return 1 - def _handle_stack_list(self, manager: StackManager) -> int: - """List all available stacks.""" - stacks = manager.list_stacks() - cx_print("\n📦 Available Stacks:\n", "info") - for stack in stacks: - pkg_count = len(stack.get("packages", [])) - console.print(f" [green]{stack.get('id', 'unknown')}[/green]") - console.print(f" {stack.get('name', 'Unnamed Stack')}") - console.print(f" {stack.get('description', 'No description')}") - console.print(f" [dim]({pkg_count} packages)[/dim]\n") - cx_print("Use: cortex stack to install a stack", "info") - return 0 + if args.dry_run and not args.name: + error("--dry-run requires a stack name (e.g., cortex stack ml --dry-run)") + return 1 + + if args.list or (not args.name and not args.describe): + return self._handle_stack_list(manager) + + if args.describe: + return self._handle_stack_describe(manager, args.describe) + + return self._handle_stack_install(manager, args) def _handle_stack_describe(self, manager: StackManager, stack_id: str) -> int: """Describe a specific stack.""" @@ -323,8 +308,62 @@ def _handle_stack_describe(self, manager: StackManager, stack_id: str) -> int: if not stack: self._print_error(f"Stack '{stack_id}' not found. Use --list to see available stacks.") return 1 + + section(f"STACK DETAILS: {stack_id}") + description = manager.describe_stack(stack_id) - console.print(description) + + status_box( + "STACK OVERVIEW", + { + "ID": stack.get("id", stack_id), + "Name": stack.get("name", "Unnamed Stack"), + "Packages": str(len(stack.get("packages", []))), + }, + ) + + from rich.panel import Panel + + console.print( + Panel( + description, + title="[bold]Description[/bold]", + border_style="cyan", + padding=(1, 2), + ) + ) + + return 0 + + def _handle_stack_list(self, manager: StackManager) -> int: + """List all available stacks.""" + stacks = manager.list_stacks() + + rows = [] + for stack in stacks: + rows.append( + [ + stack.get("id", "unknown"), + stack.get("name", "Unnamed Stack"), + stack.get("description", "No description"), + len(stack.get("packages", [])), + ] + ) + + section("AVAILABLE STACKS") + + data_table( + columns=[ + {"name": "ID", "style": "cyan"}, + {"name": "Name"}, + {"name": "Description"}, + {"name": "Packages", "justify": "right"}, + ], + rows=rows, + title="Pre-built Software Stacks", + ) + + info("Use: cortex stack to install a stack", badge=True) return 0 def _handle_stack_install(self, manager: StackManager, args: argparse.Namespace) -> int: @@ -333,9 +372,9 @@ def _handle_stack_install(self, manager: StackManager, args: argparse.Namespace) suggested_name = manager.suggest_stack(args.name) if suggested_name != original_name: - cx_print( - f"💡 No GPU detected, using '{suggested_name}' instead of '{original_name}'", - "info", + info( + f"No GPU detected. Using '{suggested_name}' instead of '{original_name}'", + badge=True, ) stack = manager.find_stack(suggested_name) @@ -357,17 +396,26 @@ def _handle_stack_install(self, manager: StackManager, args: argparse.Namespace) def _handle_stack_dry_run(self, stack: dict[str, Any], packages: list[str]) -> int: """Preview packages that would be installed without executing.""" - cx_print(f"\n📋 Stack: {stack['name']}", "info") - console.print("\nPackages that would be installed:") - for pkg in packages: - console.print(f" • {pkg}") - console.print(f"\nTotal: {len(packages)} packages") - cx_print("\nDry run only - no commands executed", "warning") + status_box( + f"DRY RUN: {stack['name']}", + { + "Packages": str(len(packages)), + "Mode": "Preview only", + }, + ) + + data_table( + columns=[{"name": "Package"}], + rows=[[pkg] for pkg in packages], + title="Packages to be Installed", + ) + + warning("Dry run only - no commands executed") return 0 def _handle_stack_real_install(self, stack: dict[str, Any], packages: list[str]) -> int: """Install all packages in the stack.""" - cx_print(f"\n🚀 Installing stack: {stack['name']}\n", "success") + section(f"INSTALLING STACK: {stack['name']}") # Batch into a single LLM request packages_str = " ".join(packages) @@ -377,8 +425,14 @@ def _handle_stack_real_install(self, stack: dict[str, Any], packages: list[str]) self._print_error(f"Failed to install stack '{stack['name']}'") return 1 - self._print_success(f"\n✅ Stack '{stack['name']}' installed successfully!") - console.print(f"Installed {len(packages)} packages") + summary_box( + "STACK INSTALLED", + [ + f"Name: {stack['name']}", + f"Packages installed: {len(packages)}", + ], + success=True, + ) return 0 # --- Sandbox Commands (Docker-based package testing) --- @@ -395,22 +449,34 @@ def sandbox(self, args: argparse.Namespace) -> int: action = getattr(args, "sandbox_action", None) if not action: - cx_print("\n🐳 Docker Sandbox - Test packages safely before installing\n", "info") - console.print("Usage: cortex sandbox [options]") - console.print("\nCommands:") - console.print(" create Create a sandbox environment") - console.print(" install Install package in sandbox") - console.print(" test [package] Run tests in sandbox") - console.print(" promote Install tested package on main system") - console.print(" cleanup Remove sandbox environment") - console.print(" list List all sandboxes") - console.print(" exec Execute command in sandbox") - console.print("\nExample workflow:") - console.print(" cortex sandbox create test-env") - console.print(" cortex sandbox install test-env nginx") - console.print(" cortex sandbox test test-env") - console.print(" cortex sandbox promote test-env nginx") - console.print(" cortex sandbox cleanup test-env") + section("DOCKER SANDBOX") + + status_box( + "SANDBOX OVERVIEW", + { + "Purpose": "Test packages safely before installing", + "Backend": "Docker", + }, + ) + + data_table( + columns=[ + {"name": "Command", "style": "cyan"}, + {"name": "Description"}, + ], + rows=[ + ["create ", "Create a sandbox environment"], + ["install ", "Install package in sandbox"], + ["test [pkg]", "Run tests in sandbox"], + ["promote ", "Install tested package on system"], + ["cleanup ", "Remove sandbox environment"], + ["list", "List all sandboxes"], + ["exec ", "Execute command in sandbox"], + ], + title="Available Commands", + ) + + info("Example: cortex sandbox create test-env", badge=True) return 0 try: @@ -436,11 +502,11 @@ def sandbox(self, args: argparse.Namespace) -> int: except DockerNotFoundError as e: self._print_error(str(e)) - cx_print("Docker is required only for sandbox commands.", "info") + info("Docker is required only for sandbox commands", badge=True) return 1 except SandboxNotFoundError as e: self._print_error(str(e)) - cx_print("Use 'cortex sandbox list' to see available sandboxes.", "info") + info("Use 'cortex sandbox list' to see available sandboxes", badge=True) return 1 except SandboxAlreadyExistsError as e: self._print_error(str(e)) @@ -451,35 +517,54 @@ def _sandbox_create(self, sandbox, args: argparse.Namespace) -> int: name = args.name image = getattr(args, "image", "ubuntu:22.04") - cx_print(f"Creating sandbox '{name}'...", "info") - result = sandbox.create(name, image=image) + section("CREATING SANDBOX") + + with spinner(f"Creating sandbox '{name}'"): + result = sandbox.create(name, image=image) if result.success: - cx_print(f"✓ Sandbox environment '{name}' created", "success") - console.print(f" [dim]{result.stdout}[/dim]") + summary_box( + "SANDBOX CREATED", + [ + f"Name: {name}", + f"Image: {image}", + ], + success=True, + ) return 0 - else: - self._print_error(result.message) - if result.stderr: - console.print(f" [red]{result.stderr}[/red]") - return 1 + + error(result.message) + if result.stderr: + console.print("[red]Details:[/red]") + console.print(result.stderr.strip(), style="red") + return 1 def _sandbox_install(self, sandbox, args: argparse.Namespace) -> int: """Install a package in sandbox.""" name = args.name package = args.package - cx_print(f"Installing '{package}' in sandbox '{name}'...", "info") - result = sandbox.install(name, package) + section("SANDBOX INSTALL") + + with spinner(f"Installing '{package}' in sandbox '{name}'"): + result = sandbox.install(name, package) if result.success: - cx_print(f"✓ {package} installed in sandbox", "success") + summary_box( + "PACKAGE INSTALLED", + [ + f"Sandbox: {name}", + f"Package: {package}", + ], + success=True, + ) return 0 - else: - self._print_error(result.message) - if result.stderr: - console.print(f" [dim]{result.stderr[:500]}[/dim]") - return 1 + + error(result.message) + if result.stderr: + console.print("[red]Details:[/red]") + console.print(result.stderr[:500].strip(), style="red") + return 1 def _sandbox_test(self, sandbox, args: argparse.Namespace) -> int: """Run tests in sandbox.""" @@ -488,29 +573,37 @@ def _sandbox_test(self, sandbox, args: argparse.Namespace) -> int: name = args.name package = getattr(args, "package", None) - cx_print(f"Running tests in sandbox '{name}'...", "info") - result = sandbox.test(name, package) + section("SANDBOX TESTS") - console.print() + with spinner(f"Running tests in sandbox '{name}'"): + result = sandbox.test(name, package) + + rows = [] for test in result.test_results: if test.result == SandboxTestStatus.PASSED: - console.print(f" ✓ {test.name}") - if test.message: - console.print(f" [dim]{test.message[:80]}[/dim]") + status = "[green]PASSED[/green]" elif test.result == SandboxTestStatus.FAILED: - console.print(f" ✗ {test.name}") - if test.message: - console.print(f" [red]{test.message}[/red]") + status = "[red]FAILED[/red]" else: - console.print(f" ⊘ {test.name} [dim](skipped)[/dim]") + status = "[yellow]SKIPPED[/yellow]" + + rows.append([test.name, status]) + + data_table( + columns=[ + {"name": "Test"}, + {"name": "Result", "justify": "center"}, + ], + rows=rows, + title="Test Results", + ) - console.print() if result.success: - cx_print("All tests passed", "success") + success("All tests passed") return 0 - else: - self._print_error("Some tests failed") - return 1 + + error("Some tests failed") + return 1 def _sandbox_promote(self, sandbox, args: argparse.Namespace) -> int: """Promote a tested package to main system.""" @@ -518,47 +611,50 @@ def _sandbox_promote(self, sandbox, args: argparse.Namespace) -> int: package = args.package dry_run = getattr(args, "dry_run", False) skip_confirm = getattr(args, "yes", False) + section("SANDBOX PROMOTION") if dry_run: - result = sandbox.promote(name, package, dry_run=True) - cx_print(f"Would run: sudo apt-get install -y {package}", "info") + info(f"Would run: sudo apt-get install -y {package}", badge=True) return 0 - # Confirm with user unless -y flag if not skip_confirm: - console.print(f"\nPromote '{package}' to main system? [Y/n]: ", end="") + console.print(f"[yellow]Promote '{package}' to main system?[/yellow] [Y/n]: ", end="") try: response = input().strip().lower() if response and response not in ("y", "yes"): - cx_print("Promotion cancelled", "warning") + warning("Promotion cancelled") return 0 except (EOFError, KeyboardInterrupt): console.print() - cx_print("Promotion cancelled", "warning") + warning("Promotion cancelled") return 0 - cx_print(f"Installing '{package}' on main system...", "info") - result = sandbox.promote(name, package, dry_run=False) + # sandbox.promote likely runs docker/remote operations; keep spinner for UX + with spinner(f"Installing '{package}' on main system"): + result = sandbox.promote(name, package, dry_run=False) if result.success: - cx_print(f"✓ {package} installed on main system", "success") + success(f"{package} installed on main system") return 0 - else: - self._print_error(result.message) - if result.stderr: - console.print(f" [red]{result.stderr[:500]}[/red]") - return 1 + + error(result.message) + if result.stderr: + console.print("[red]Details:[/red]") + console.print(result.stderr[:500].strip(), style="red") + return 1 def _sandbox_cleanup(self, sandbox, args: argparse.Namespace) -> int: """Remove a sandbox environment.""" name = args.name force = getattr(args, "force", False) - cx_print(f"Removing sandbox '{name}'...", "info") - result = sandbox.cleanup(name, force=force) + section("SANDBOX CLEANUP") + + with spinner(f"Removing sandbox '{name}'"): + result = sandbox.cleanup(name, force=force) if result.success: - cx_print(f"✓ Sandbox '{name}' removed", "success") + success(f"Sandbox '{name}' removed") return 0 else: self._print_error(result.message) @@ -569,19 +665,32 @@ def _sandbox_list(self, sandbox) -> int: sandboxes = sandbox.list_sandboxes() if not sandboxes: - cx_print("No sandbox environments found", "info") - cx_print("Create one with: cortex sandbox create ", "info") + info("No sandbox environments found", badge=True) + info("Create one with: cortex sandbox create ", badge=True) return 0 - cx_print("\n🐳 Sandbox Environments:\n", "info") + rows = [] for sb in sandboxes: - status_icon = "🟢" if sb.state.value == "running" else "⚪" - console.print(f" {status_icon} [green]{sb.name}[/green]") - console.print(f" Image: {sb.image}") - console.print(f" Created: {sb.created_at[:19]}") - if sb.packages: - console.print(f" Packages: {', '.join(sb.packages)}") - console.print() + rows.append( + [ + sb.name, + sb.image, + sb.state.value, + ", ".join(sb.packages) if sb.packages else "—", + ] + ) + + section("SANDBOX ENVIRONMENTS") + + data_table( + columns=[ + {"name": "Name", "style": "cyan"}, + {"name": "Image"}, + {"name": "State"}, + {"name": "Packages"}, + ], + rows=rows, + ) return 0 @@ -610,19 +719,42 @@ def ask(self, question: str) -> int: provider = self._get_provider() self._debug(f"Using provider: {provider}") + section("AI QUERY") + + status_box( + "REQUEST", + { + "Question": question, + "Provider": provider, + }, + ) + try: - handler = AskHandler( - api_key=api_key, - provider=provider, + with spinner("Thinking"): + handler = AskHandler( + api_key=api_key, + provider=provider, + ) + answer = handler.ask(question) + + section("ANSWER") + from rich.panel import Panel + + console.print( + Panel( + answer, + title="[bold]RESPONSE[/bold]", + border_style="cyan", + padding=(1, 2), + ) ) - answer = handler.ask(question) - console.print(answer) return 0 except ImportError as e: # Provide a helpful message if provider SDK is missing self._print_error(str(e)) - cx_print( - "Install the required SDK or set CORTEX_PROVIDER=ollama for local mode.", "info" + info( + "Install the required SDK or set CORTEX_PROVIDER=ollama for local mode.", + badge=True, ) return 1 except ValueError as e: @@ -640,9 +772,9 @@ def install( parallel: bool = False, ): # Validate input first - is_valid, error = validate_install_request(software) + is_valid, error_msg = validate_install_request(software) if not is_valid: - self._print_error(error) + self._print_error(error_msg) return 1 # Special-case the ml-cpu stack: @@ -664,7 +796,6 @@ def install( provider = self._get_provider() self._debug(f"Using provider: {provider}") - self._debug(f"API key: {api_key[:10]}...{api_key[-4:]}") # Initialize installation history history = InstallationHistory() @@ -672,22 +803,23 @@ def install( start_time = datetime.now() try: - self._print_status("🧠", "Understanding request...") - - interpreter = CommandInterpreter(api_key=api_key, provider=provider) - - self._print_status("📦", "Planning installation...") - - for _ in range(10): - self._animate_spinner("Analyzing system requirements...") - self._clear_line() + section("INSTALL REQUEST") + + status_box( + "REQUEST DETAILS", + { + "Software": software, + "Provider": provider, + "Mode": "Dry run" if dry_run else "Execute" if execute else "Plan only", + }, + ) - commands = interpreter.parse(f"install {software}") + with spinner("Understanding request and planning installation"): + interpreter = CommandInterpreter(api_key=api_key, provider=provider) + commands = interpreter.parse(f"install {software}") if not commands: - self._print_error( - "No commands generated. Please try again with a different request." - ) + self._print_error("No commands generated. Please try again.") return 1 # Extract packages from commands for tracking @@ -699,45 +831,53 @@ def install( InstallationType.INSTALL, packages, commands, start_time ) - self._print_status("⚙️", f"Installing {software}...") - print("\nGenerated commands:") - for i, cmd in enumerate(commands, 1): - print(f" {i}. {cmd}") + section("INSTALLATION PLAN") + + data_table( + columns=[ + {"name": "#", "justify": "right"}, + {"name": "Command"}, + ], + rows=[[i + 1, cmd] for i, cmd in enumerate(commands)], + title="Generated Commands", + ) if dry_run: - print("\n(Dry run mode - commands not executed)") + summary_box( + "DRY RUN COMPLETE", + [ + f"Commands generated: {len(commands)}", + "No commands were executed", + ], + success=True, + ) if install_id: history.update_installation(install_id, InstallationStatus.SUCCESS) return 0 - if execute: - - def progress_callback(current, total, step): - status_emoji = "⏳" - if step.status == StepStatus.SUCCESS: - status_emoji = "✅" - elif step.status == StepStatus.FAILED: - status_emoji = "❌" - print(f"\n[{current}/{total}] {status_emoji} {step.description}") - print(f" Command: {step.command}") + if not execute: + warning("Use --execute to run these commands") + info(f"Example: cortex install {software} --execute", badge=True) + return 0 - print("\nExecuting commands...") + section("EXECUTION") - if parallel: - import asyncio + if parallel: + import asyncio - from cortex.install_parallel import run_parallel_install + from cortex.install_parallel import run_parallel_install - def parallel_log_callback(message: str, level: str = "info"): - if level == "success": - cx_print(f" ✅ {message}", "success") - elif level == "error": - cx_print(f" ❌ {message}", "error") - else: - cx_print(f" ℹ {message}", "info") + def parallel_log_callback(message: str, level: str = "info"): + if level == "success": + success(message) + elif level == "error": + error(message) + else: + info(message) - try: - success, parallel_tasks = asyncio.run( + try: + with spinner("Executing installation in parallel"): + success_flag, _ = asyncio.run( run_parallel_install( commands=commands, descriptions=[f"Step {i + 1}" for i in range(len(commands))], @@ -746,134 +886,83 @@ def parallel_log_callback(message: str, level: str = "info"): log_callback=parallel_log_callback, ) ) + except Exception as e: + if install_id: + history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + self._print_error(f"Parallel execution failed: {e}") + return 1 - total_duration = 0.0 - if parallel_tasks: - max_end = max( - (t.end_time for t in parallel_tasks if t.end_time is not None), - default=None, - ) - min_start = min( - (t.start_time for t in parallel_tasks if t.start_time is not None), - default=None, - ) - if max_end is not None and min_start is not None: - total_duration = max_end - min_start - - if success: - self._print_success(f"{software} installed successfully!") - print(f"\nCompleted in {total_duration:.2f} seconds (parallel mode)") - - if install_id: - history.update_installation(install_id, InstallationStatus.SUCCESS) - print(f"\n📝 Installation recorded (ID: {install_id})") - print(f" To rollback: cortex rollback {install_id}") + if not success_flag: + self._print_error("Installation failed") + return 1 - return 0 + summary_box( + "INSTALLATION COMPLETE", + [ + f"Software: {software}", + "Mode: Parallel", + ], + success=True, + ) - failed_tasks = [ - t for t in parallel_tasks if getattr(t.status, "value", "") == "failed" - ] - error_msg = failed_tasks[0].error if failed_tasks else "Installation failed" + if install_id: + history.update_installation(install_id, InstallationStatus.SUCCESS) + return 0 - if install_id: - history.update_installation( - install_id, - InstallationStatus.FAILED, - error_msg, - ) + total_steps = len(commands) + info("Installing packages", badge=True) - self._print_error("Installation failed") - if error_msg: - print(f" Error: {error_msg}", file=sys.stderr) - if install_id: - print(f"\n📝 Installation recorded (ID: {install_id})") - print(f" View details: cortex history {install_id}") - return 1 - - except (ValueError, OSError) as e: - if install_id: - history.update_installation( - install_id, InstallationStatus.FAILED, str(e) - ) - self._print_error(f"Parallel execution failed: {str(e)}") - return 1 - except Exception as e: - if install_id: - history.update_installation( - install_id, InstallationStatus.FAILED, str(e) - ) - self._print_error(f"Unexpected parallel execution error: {str(e)}") - if self.verbose: - import traceback + if any(cmd.strip().startswith("sudo ") for cmd in commands): + if os.system("sudo -v") != 0: + self._print_error("Sudo authentication failed or cancelled") + return 1 - traceback.print_exc() - return 1 + for idx, cmd in enumerate(commands, 1): + info(f"Step {idx}/{total_steps}", badge=True) + console.print(f" [dim]→ {cmd}[/dim]") coordinator = InstallationCoordinator( - commands=commands, - descriptions=[f"Step {i + 1}" for i in range(len(commands))], + commands=[cmd], + descriptions=[f"Step {idx}"], timeout=300, stop_on_error=True, - progress_callback=progress_callback, ) result = coordinator.execute() - if result.success: - self._print_success(f"{software} installed successfully!") - print(f"\nCompleted in {result.total_duration:.2f} seconds") - - # Record successful installation - if install_id: - history.update_installation(install_id, InstallationStatus.SUCCESS) - print(f"\n📝 Installation recorded (ID: {install_id})") - print(f" To rollback: cortex rollback {install_id}") + if not result.success: + self._print_error(f"Failed at step {idx}") + if result.error_message: + console.print(result.error_message, style="red") - return 0 - else: - # Record failed installation if install_id: - error_msg = result.error_message or "Installation failed" history.update_installation( - install_id, InstallationStatus.FAILED, error_msg + install_id, + InstallationStatus.FAILED, + result.error_message or "Installation failed", ) - - if result.failed_step is not None: - self._print_error(f"Installation failed at step {result.failed_step + 1}") - else: - self._print_error("Installation failed") - if result.error_message: - print(f" Error: {result.error_message}", file=sys.stderr) - if install_id: - print(f"\n📝 Installation recorded (ID: {install_id})") - print(f" View details: cortex history {install_id}") return 1 - else: - print("\nTo execute these commands, run with --execute flag") - print("Example: cortex install docker --execute") - return 0 + duration = (datetime.now() - start_time).total_seconds() + + summary_box( + "INSTALLATION COMPLETE", + [ + f"Software: {software}", + f"Duration: {duration:.2f}s", + ], + success=True, + ) - except ValueError as e: - if install_id: - history.update_installation(install_id, InstallationStatus.FAILED, str(e)) - self._print_error(str(e)) - return 1 - except RuntimeError as e: - if install_id: - history.update_installation(install_id, InstallationStatus.FAILED, str(e)) - self._print_error(f"API call failed: {str(e)}") - return 1 - except OSError as e: if install_id: - history.update_installation(install_id, InstallationStatus.FAILED, str(e)) - self._print_error(f"System error: {str(e)}") - return 1 + history.update_installation(install_id, InstallationStatus.SUCCESS) + + return 0 + except Exception as e: if install_id: history.update_installation(install_id, InstallationStatus.FAILED, str(e)) - self._print_error(f"Unexpected error: {str(e)}") + self._print_error(f"Unexpected error: {e}") if self.verbose: import traceback @@ -888,11 +977,17 @@ def cache_stats(self) -> int: stats = cache.stats() hit_rate = f"{stats.hit_rate * 100:.1f}%" if stats.total else "0.0%" - cx_header("Cache Stats") - cx_print(f"Hits: {stats.hits}", "info") - cx_print(f"Misses: {stats.misses}", "info") - cx_print(f"Hit rate: {hit_rate}", "info") - cx_print(f"Saved calls (approx): {stats.hits}", "info") + section("CACHE STATISTICS") + + status_box( + "SEMANTIC CACHE", + { + "Hits": str(stats.hits), + "Misses": str(stats.misses), + "Hit rate": hit_rate, + "Saved calls (approx)": str(stats.hits), + }, + ) return 0 except (ImportError, OSError) as e: self._print_error(f"Unable to read cache stats: {e}") @@ -918,52 +1013,97 @@ def history(self, limit: int = 20, status: str | None = None, show_id: str | Non self._print_error(f"Installation {show_id} not found") return 1 - print(f"\nInstallation Details: {record.id}") - print("=" * 60) - print(f"Timestamp: {record.timestamp}") - print(f"Operation: {record.operation_type.value}") - print(f"Status: {record.status.value}") - if record.duration_seconds: - print(f"Duration: {record.duration_seconds:.2f}s") + section("INSTALLATION DETAILS") + + status_value = record.status.value + if status_value.lower() == "success": + status_display = "[green]SUCCESS[/green]" + elif status_value.lower() == "failed": + status_display = "[red]FAILED[/red]" else: - print("Duration: N/A") - print(f"\nPackages: {', '.join(record.packages)}") + status_display = status_value.upper() + + status_box( + f"INSTALL ID: {record.id}", + { + "Timestamp": record.timestamp, + "Operation": record.operation_type.value, + "Status": status_display, + "Duration": ( + f"{record.duration_seconds:.2f}s" if record.duration_seconds else "N/A" + ), + "Rollback available": str(record.rollback_available), + }, + ) - if record.error_message: - print(f"\nError: {record.error_message}") + if record.packages: + data_table( + columns=[{"name": "Packages"}], + rows=[[pkg] for pkg in record.packages], + title="Installed Packages", + ) if record.commands_executed: - print("\nCommands executed:") - for cmd in record.commands_executed: - print(f" {cmd}") + data_table( + columns=[{"name": "Commands Executed"}], + rows=[[cmd] for cmd in record.commands_executed], + title="Commands", + ) + + if record.error_message: + warning(record.error_message) - print(f"\nRollback available: {record.rollback_available}") return 0 - else: # List history - status_filter = InstallationStatus(status) if status else None - records = history.get_history(limit, status_filter) + status_filter = InstallationStatus(status) if status else None + records = history.get_history(limit, status_filter) - if not records: - print("No installation records found.") - return 0 + if not records: + info("No installation records found", badge=True) + return 0 - print( - f"\n{'ID':<18} {'Date':<20} {'Operation':<12} {'Packages':<30} {'Status':<15}" + rows = [] + for r in records: + date = r.timestamp[:19].replace("T", " ") + packages = ", ".join(r.packages[:2]) + if len(r.packages) > 2: + packages += f" +{len(r.packages) - 2}" + + status_value = r.status.value + if status_value.lower() == "success": + status_display = "[green]SUCCESS[/green]" + elif status_value.lower() == "failed": + status_display = "[red]FAILED[/red]" + else: + status_display = status_value.upper() + + rows.append( + [ + r.id, + date, + r.operation_type.value, + packages, + status_display, + ] ) - print("=" * 100) - for r in records: - date = r.timestamp[:19].replace("T", " ") - packages = ", ".join(r.packages[:2]) - if len(r.packages) > 2: - packages += f" +{len(r.packages) - 2}" + section("INSTALLATION HISTORY") + + data_table( + columns=[ + {"name": "ID", "style": "cyan"}, + {"name": "Date"}, + {"name": "Operation"}, + {"name": "Packages"}, + {"name": "Status", "justify": "center"}, + ], + rows=rows, + title=f"Last {len(rows)} Installations", + ) - print( - f"{r.id:<18} {date:<20} {r.operation_type.value:<12} {packages:<30} {r.status.value:<15}" - ) + info("Use: cortex history to view details", badge=True) + return 0 - return 0 except (ValueError, OSError) as e: self._print_error(f"Failed to retrieve history: {str(e)}") return 1 @@ -980,14 +1120,22 @@ def rollback(self, install_id: str, dry_run: bool = False): history = InstallationHistory() try: - success, message = history.rollback(install_id, dry_run) + success_flag, message = history.rollback(install_id, dry_run) + + section("ROLLBACK") if dry_run: - print("\nRollback actions (dry run):") - print(message) + status_box( + f"DRY RUN: {install_id}", + { + "Action": "Rollback preview", + "Details": message, + }, + ) return 0 - elif success: - self._print_success(message) + + if success_flag: + success(message) return 0 else: self._print_error(message) @@ -1003,24 +1151,30 @@ def rollback(self, install_id: str, dry_run: bool = False): traceback.print_exc() return 1 - def status(self): + def status(self) -> int: """Show comprehensive system status and run health checks""" from cortex.doctor import SystemDoctor - # Run the comprehensive system health checks - # This now includes all functionality from the old status command - # plus all the detailed health checks from doctor + section("SYSTEM STATUS") + + # Run health checks (spinner handled inside SystemDoctor) doctor = SystemDoctor() - return doctor.run_checks() + result = doctor.run_checks() + + if isinstance(result, int): + return result + if hasattr(result, "success"): + return 0 if result.success else 1 + return 0 def wizard(self): """Interactive setup wizard for API key configuration""" show_banner() console.print() - cx_print("Welcome to Cortex Setup Wizard!", "success") + section("SETUP WIZARD") + success("Welcome to Cortex Setup Wizard!") console.print() - # (Simplified for brevity - keeps existing logic) - cx_print("Please export your API key in your shell profile.", "info") + info("Please export your API key in your shell profile.", badge=True) return 0 def env(self, args: argparse.Namespace) -> int: @@ -1032,7 +1186,8 @@ def env(self, args: argparse.Namespace) -> int: if not action: self._print_error( - "Please specify a subcommand (set/get/list/delete/export/import/clear/template/audit/check/path)" + "Please specify a subcommand " + "(set/get/list/delete/export/import/clear/template/audit/check/path)" ) return 1 @@ -1088,19 +1243,26 @@ def _env_set(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int description = getattr(args, "description", "") or "" try: - env_mgr.set_variable( - app=app, - key=key, - value=value, - encrypt=encrypt, - var_type=var_type, - description=description, - ) + with spinner(f"Setting variable '{key}' for app '{app}'"): + env_mgr.set_variable( + app=app, + key=key, + value=value, + encrypt=encrypt, + var_type=var_type, + description=description, + ) - if encrypt: - cx_print("🔐 Variable encrypted and stored", "success") - else: - cx_print("✓ Environment variable set", "success") + status_box( + "VARIABLE SAVED", + { + "App": app, + "Key": key, + "Encrypted": "[yellow]yes[/yellow]" if encrypt else "[green]no[/green]", + "Type": var_type, + }, + border_color="green", + ) return 0 except ValueError as e: @@ -1109,7 +1271,7 @@ def _env_set(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int except ImportError as e: self._print_error(str(e)) if "cryptography" in str(e).lower(): - cx_print("Install with: pip install cryptography", "info") + info("Install with: pip install cryptography", badge=True) return 1 def _env_get(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: @@ -1126,10 +1288,19 @@ def _env_get(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int var_info = env_mgr.get_variable_info(app, key) - if var_info and var_info.encrypted and not show_encrypted: - console.print(f"{key}: [dim][encrypted][/dim]") - else: - console.print(f"{key}: {value}") + encrypted = var_info.encrypted if var_info else False + + display_value = "[dim][encrypted][/dim]" if encrypted and not show_encrypted else str(value) + + section("ENV VARIABLE") + + status_box( + f"{app} • {key}", + { + "Value": display_value, + "Encrypted": "[yellow]yes[/yellow]" if encrypted else "[green]no[/green]", + }, + ) return 0 @@ -1141,29 +1312,46 @@ def _env_list(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> in variables = env_mgr.list_variables(app) if not variables: - cx_print(f"No environment variables set for '{app}'", "info") + info(f"No environment variables set for '{app}'", badge=True) return 0 - cx_header(f"Environment: {app}") + section(f"ENVIRONMENT: {app}") + rows = [] for var in sorted(variables, key=lambda v: v.key): if var.encrypted: if show_encrypted: try: value = env_mgr.get_variable(app, var.key, decrypt=True) - console.print(f" {var.key}: {value} [dim](decrypted)[/dim]") except ValueError: - console.print(f" {var.key}: [red][decryption failed][/red]") + value = "[red]decryption failed[/red]" else: - console.print(f" {var.key}: [yellow][encrypted][/yellow]") + value = "[dim][encrypted][/dim]" else: - console.print(f" {var.key}: {var.value}") + value = var.value + + rows.append( + [ + var.key, + value, + "[yellow]yes[/yellow]" if var.encrypted else "[green]no[/green]", + var.description or "", + ] + ) - if var.description: - console.print(f" [dim]# {var.description}[/dim]") + data_table( + columns=[ + {"name": "Key", "style": "cyan"}, + {"name": "Value"}, + {"name": "Encrypted", "justify": "center"}, + {"name": "Description"}, + ], + rows=rows, + title=f"Variables ({len(rows)})", + expand=True, + ) - console.print() - console.print(f"[dim]Total: {len(variables)} variable(s)[/dim]") + info(f"Total variables: {len(rows)}", badge=True) return 0 def _env_delete(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: @@ -1171,12 +1359,15 @@ def _env_delete(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> app = args.app key = args.key - if env_mgr.delete_variable(app, key): - cx_print(f"✓ Deleted '{key}' from '{app}'", "success") + with spinner(f"Deleting variable '{key}' from '{app}'"): + deleted = env_mgr.delete_variable(app, key) + + if deleted: + success(f"Deleted '{key}' from '{app}'") return 0 - else: - self._print_error(f"Variable '{key}' not found for app '{app}'") - return 1 + + self._print_error(f"Variable '{key}' not found for app '{app}'") + return 1 def _env_export(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: """Export environment variables to .env format.""" @@ -1187,14 +1378,14 @@ def _env_export(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> content = env_mgr.export_env(app, include_encrypted=include_encrypted) if not content: - cx_print(f"No environment variables to export for '{app}'", "info") + info(f"No environment variables to export for '{app}'", badge=True) return 0 if output_file: try: with open(output_file, "w", encoding="utf-8") as f: f.write(content) - cx_print(f"✓ Exported to {output_file}", "success") + success(f"Exported environment to {output_file}") except OSError as e: self._print_error(f"Failed to write file: {e}") return 1 @@ -1206,7 +1397,7 @@ def _env_export(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> def _env_import(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: """Import environment variables from .env format.""" - import sys + import sys as _sys app = args.app input_file = getattr(args, "file", None) @@ -1216,12 +1407,12 @@ def _env_import(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> if input_file: with open(input_file, encoding="utf-8") as f: content = f.read() - elif not sys.stdin.isatty(): - content = sys.stdin.read() + elif not _sys.stdin.isatty(): + content = _sys.stdin.read() else: self._print_error("No input file specified and stdin is empty") - cx_print("Usage: cortex env import ", "info") - cx_print(" or: cat .env | cortex env import ", "info") + info("Usage: cortex env import ", badge=True) + info(" or: cat .env | cortex env import ", badge=True) return 1 # Parse encrypt-keys argument @@ -1229,16 +1420,16 @@ def _env_import(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> if encrypt_keys: encrypt_list = [k.strip() for k in encrypt_keys.split(",")] - count, errors = env_mgr.import_env(app, content, encrypt_keys=encrypt_list) + with spinner("Importing environment variables"): + count, errors = env_mgr.import_env(app, content, encrypt_keys=encrypt_list) - if errors: - for err in errors: - cx_print(f" ⚠ {err}", "warning") + for err in errors: + warning(err) if count > 0: - cx_print(f"✓ Imported {count} variable(s) to '{app}'", "success") + success(f"Imported {count} variable(s) to '{app}'") else: - cx_print("No variables imported", "info") + info("No variables imported", badge=True) # Return success (0) even with partial errors - some vars imported successfully return 0 @@ -1259,13 +1450,16 @@ def _env_clear(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> i if not force: confirm = input(f"⚠️ Clear ALL environment variables for '{app}'? (y/n): ") if confirm.lower() != "y": - cx_print("Operation cancelled", "info") + info("Operation cancelled", badge=True) return 0 - if env_mgr.clear_app(app): - cx_print(f"✓ Cleared all variables for '{app}'", "success") + with spinner(f"Clearing environment for '{app}'"): + cleared = env_mgr.clear_app(app) + + if cleared: + success(f"Cleared all variables for '{app}'") else: - cx_print(f"No environment data found for '{app}'", "info") + info(f"No environment data found for '{app}'", badge=True) return 0 @@ -1289,15 +1483,23 @@ def _env_template_list(self, env_mgr: EnvironmentManager) -> int: """List available templates.""" templates = env_mgr.list_templates() - cx_header("Available Environment Templates") + section("ENV TEMPLATES") + rows = [] for template in sorted(templates, key=lambda t: t.name): - console.print(f" [green]{template.name}[/green]") - console.print(f" {template.description}") - console.print(f" [dim]{len(template.variables)} variables[/dim]") - console.print() + rows.append([template.name, template.description, len(template.variables)]) + + data_table( + columns=[ + {"name": "Name", "style": "cyan"}, + {"name": "Description"}, + {"name": "Variables", "justify": "right"}, + ], + rows=rows, + title="Available Environment Templates", + ) - cx_print("Use 'cortex env template show ' for details", "info") + info("Use 'cortex env template show ' for details", badge=True) return 0 def _env_template_show(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: @@ -1309,20 +1511,35 @@ def _env_template_show(self, env_mgr: EnvironmentManager, args: argparse.Namespa self._print_error(f"Template '{template_name}' not found") return 1 - cx_header(f"Template: {template.name}") - console.print(f" {template.description}") - console.print() + section(f"TEMPLATE: {template.name}") + + status_box( + "TEMPLATE INFO", + { + "Description": template.description, + "Variables": str(len(template.variables)), + }, + ) - console.print("[bold]Variables:[/bold]") + rows = [] for var in template.variables: - req = "[red]*[/red]" if var.required else " " + req = "[red]*[/red]" if var.required else "" default = f" = {var.default}" if var.default else "" - console.print(f" {req} [cyan]{var.name}[/cyan] ({var.var_type}){default}") - if var.description: - console.print(f" [dim]{var.description}[/dim]") + rows.append([req, var.name, var.var_type, var.description or "", default]) + + data_table( + columns=[ + {"name": "Req"}, + {"name": "Name", "style": "cyan"}, + {"name": "Type"}, + {"name": "Description"}, + {"name": "Default"}, + ], + rows=rows, + title="Template Variables", + ) - console.print() - console.print("[dim]* = required[/dim]") + info("* = required", badge=False) return 0 def _env_template_apply(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: @@ -1344,34 +1561,46 @@ def _env_template_apply(self, env_mgr: EnvironmentManager, args: argparse.Namesp if encrypt_arg: encrypt_keys = [k.strip() for k in encrypt_arg.split(",")] - result = env_mgr.apply_template( - template_name=template_name, - app=app, - values=values, - encrypt_keys=encrypt_keys, - ) + with spinner(f"Applying template '{template_name}' to '{app}'"): + result = env_mgr.apply_template( + template_name=template_name, + app=app, + values=values, + encrypt_keys=encrypt_keys, + ) if result.valid: - cx_print(f"✓ Applied template '{template_name}' to '{app}'", "success") + success(f"Applied template '{template_name}' to '{app}'") return 0 - else: - self._print_error(f"Failed to apply template '{template_name}'") - for err in result.errors: - console.print(f" [red]✗[/red] {err}") - return 1 + + self._print_error(f"Failed to apply template '{template_name}'") + for err in result.errors: + warning(err) + return 1 def _env_list_apps(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: """List all apps with stored environments.""" apps = env_mgr.list_apps() if not apps: - cx_print("No applications with stored environments", "info") + info("No applications with stored environments", badge=True) return 0 - cx_header("Applications with Environments") + section("APPLICATIONS WITH ENVIRONMENTS") + + rows = [] for app in apps: var_count = len(env_mgr.list_variables(app)) - console.print(f" [green]{app}[/green] [dim]({var_count} variables)[/dim]") + rows.append([app, var_count]) + + data_table( + columns=[ + {"name": "Application", "style": "cyan"}, + {"name": "Variables", "justify": "right"}, + ], + rows=rows, + title="Apps", + ) return 0 @@ -1379,12 +1608,13 @@ def _env_load(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> in """Load environment variables into current process.""" app = args.app - count = env_mgr.load_to_environ(app) + with spinner(f"Loading variables from '{app}' into environment"): + count = env_mgr.load_to_environ(app) if count > 0: - cx_print(f"✓ Loaded {count} variable(s) from '{app}' into environment", "success") + success(f"Loaded {count} variable(s) from '{app}' into environment") else: - cx_print(f"No variables to load for '{app}'", "info") + info(f"No variables to load for '{app}'", badge=True) return 0 @@ -1784,11 +2014,10 @@ def import_deps(self, args: argparse.Namespace) -> int: if scan_all: return self._import_all(importer, execute, include_dev) - # Handle single file import if not file_path: self._print_error("Please specify a dependency file or use --all to scan directory") - cx_print("Usage: cortex import [--execute] [--dev]", "info") - cx_print(" cortex import --all [--execute] [--dev]", "info") + info("Usage: cortex import [--execute] [--dev]", badge=True) + info(" cortex import --all [--execute] [--dev]", badge=True) return 1 return self._import_single_file(importer, file_path, execute, include_dev) @@ -1803,12 +2032,12 @@ def _import_single_file( self._display_parse_result(result, include_dev) if result.errors: - for error in result.errors: - self._print_error(error) + for err in result.errors: + self._print_error(err) return 1 if not result.packages and not result.dev_packages: - cx_print("No packages found in file", "info") + info("No packages found in file", badge=True) return 0 # Get install command @@ -1819,24 +2048,30 @@ def _import_single_file( # Dry run mode (default) if not execute: - console.print(f"\n[bold]Install command:[/bold] {install_cmd}") - cx_print("\nTo install these packages, run with --execute flag", "info") - cx_print(f"Example: cortex import {file_path} --execute", "info") + status_box( + "INSTALL PREVIEW", + { + "Ecosystem": result.ecosystem.value, + "Command": install_cmd, + }, + ) + info("Run again with --execute to install", badge=True) return 0 # Execute mode - run the install command return self._execute_install(install_cmd, result.ecosystem) def _import_all(self, importer: DependencyImporter, execute: bool, include_dev: bool) -> int: - """Scan directory and import all dependency files.""" - cx_print("Scanning directory...", "info") + section("DEPENDENCY SCAN") - results = importer.scan_directory(include_dev=include_dev) + with spinner("Scanning directory for dependency files"): + results = importer.scan_directory(include_dev=include_dev) if not results: - cx_print("No dependency files found in current directory", "info") + info("No dependency files found in current directory", badge=True) return 0 + rows = [] # Display all found files total_packages = 0 total_dev_packages = 0 @@ -1844,50 +2079,57 @@ def _import_all(self, importer: DependencyImporter, execute: bool, include_dev: for file_path, result in results.items(): filename = os.path.basename(file_path) if result.errors: - console.print(f" [red]✗[/red] {filename} (error: {result.errors[0]})") + rows.append([filename, "ERROR", result.errors[0]]) else: pkg_count = result.prod_count dev_count = result.dev_count if include_dev else 0 total_packages += pkg_count total_dev_packages += dev_count - dev_str = f" + {dev_count} dev" if dev_count > 0 else "" - console.print(f" [green]✓[/green] {filename} ({pkg_count} packages{dev_str})") + rows.append( + [ + filename, + f"{pkg_count}", + f"+{dev_count}" if dev_count > 0 else "—", + ] + ) - console.print() + data_table( + columns=[ + {"name": "File", "style": "cyan"}, + {"name": "Packages"}, + {"name": "Dev"}, + ], + rows=rows, + title="Detected Dependency Files", + ) if total_packages == 0 and total_dev_packages == 0: - cx_print("No packages found in dependency files", "info") + info("No packages found in dependency files", badge=True) return 0 - # Generate install commands commands = importer.get_install_commands_for_results(results) - if not commands: - cx_print("No install commands generated", "info") + info("No install commands generated", badge=True) return 0 - # Dry run mode (default) if not execute: - console.print("[bold]Install commands:[/bold]") - for cmd_info in commands: - console.print(f" • {cmd_info['command']}") - console.print() - cx_print("To install all packages, run with --execute flag", "info") - cx_print("Example: cortex import --all --execute", "info") + data_table( + columns=[{"name": "Install Command"}], + rows=[[c["command"]] for c in commands], + title="Install Commands", + ) + info("Run with --execute to install all packages", badge=True) return 0 - # Execute mode - confirm before installing total = total_packages + total_dev_packages confirm = input(f"\nInstall all {total} packages? [Y/n]: ") - if confirm.lower() not in ["", "y", "yes"]: - cx_print("Installation cancelled", "info") + if confirm.lower() not in ("", "y", "yes"): + info("Installation cancelled", badge=True) return 0 - # Execute all install commands return self._execute_multi_install(commands) def _display_parse_result(self, result: ParseResult, include_dev: bool) -> None: - """Display the parsed packages from a dependency file.""" ecosystem_names = { PackageEcosystem.PYTHON: "Python", PackageEcosystem.NODE: "Node", @@ -1897,33 +2139,42 @@ def _display_parse_result(self, result: ParseResult, include_dev: bool) -> None: } ecosystem_name = ecosystem_names.get(result.ecosystem, "Unknown") - filename = os.path.basename(result.file_path) - cx_print(f"\n📋 Found {result.prod_count} {ecosystem_name} packages", "info") + section("DEPENDENCY PARSE") + + status_box( + "SUMMARY", + { + "Ecosystem": ecosystem_name, + "Packages": str(result.prod_count), + "Dev packages": str(result.dev_count if include_dev else 0), + }, + ) if result.packages: - console.print("\n[bold]Packages:[/bold]") - for pkg in result.packages[:15]: # Show first 15 - version_str = f" ({pkg.version})" if pkg.version else "" - console.print(f" • {pkg.name}{version_str}") - if len(result.packages) > 15: - console.print(f" [dim]... and {len(result.packages) - 15} more[/dim]") + data_table( + columns=[{"name": "Package"}], + rows=[ + [f"{p.name}{f' ({p.version})' if p.version else ''}"] + for p in result.packages[:15] + ], + title="Packages", + ) if include_dev and result.dev_packages: - console.print(f"\n[bold]Dev packages:[/bold] ({result.dev_count})") - for pkg in result.dev_packages[:10]: - version_str = f" ({pkg.version})" if pkg.version else "" - console.print(f" • {pkg.name}{version_str}") - if len(result.dev_packages) > 10: - console.print(f" [dim]... and {len(result.dev_packages) - 10} more[/dim]") - - if result.warnings: - console.print() - for warning in result.warnings: - cx_print(f"⚠ {warning}", "warning") + data_table( + columns=[{"name": "Dev Package"}], + rows=[ + [f"{p.name}{f' ({p.version})' if p.version else ''}"] + for p in result.dev_packages[:10] + ], + title="Dev Packages", + ) + + for w in result.warnings: + warning(w) def _execute_install(self, command: str, ecosystem: PackageEcosystem) -> int: - """Execute a single install command.""" ecosystem_names = { PackageEcosystem.PYTHON: "Python", PackageEcosystem.NODE: "Node", @@ -1933,73 +2184,83 @@ def _execute_install(self, command: str, ecosystem: PackageEcosystem) -> int: } ecosystem_name = ecosystem_names.get(ecosystem, "") - cx_print(f"\n✓ Installing {ecosystem_name} packages...", "success") - - def progress_callback(current: int, total: int, step: InstallationStep) -> None: - status_emoji = "⏳" - if step.status == StepStatus.SUCCESS: - status_emoji = "✅" - elif step.status == StepStatus.FAILED: - status_emoji = "❌" - console.print(f"[{current}/{total}] {status_emoji} {step.description}") - - coordinator = InstallationCoordinator( - commands=[command], - descriptions=[f"Install {ecosystem_name} packages"], - timeout=600, # 10 minutes for package installation - stop_on_error=True, - progress_callback=progress_callback, - ) - result = coordinator.execute() + section("INSTALLING DEPENDENCIES") + + with progress_bar() as progress: + task = progress.add_task("Installing", total=1) + + def progress_callback(current, total, step: InstallationStep): + if step.status == StepStatus.RUNNING: + info(f"Step {current}/{total}", badge=True) + elif step.status == StepStatus.FAILED: + error(f"Step {current}/{total} failed") + + coordinator = InstallationCoordinator( + commands=[command], + descriptions=[f"Install {ecosystem_name} packages"], + timeout=600, + stop_on_error=True, + progress_callback=progress_callback, + ) + + result = coordinator.execute() + # Explicitly show commands as they are about to run + for idx, cmd in enumerate(command, 1): + info(f"Step {idx}/{len(command)}", badge=True) + console.print(f" [dim]→ {cmd}[/dim]") if result.success: - self._print_success(f"{ecosystem_name} packages installed successfully!") - console.print(f"Completed in {result.total_duration:.2f} seconds") + summary_box( + "INSTALL COMPLETE", + [ + f"Ecosystem: {ecosystem_name}", + f"Duration: {result.total_duration:.2f}s", + ], + success=True, + ) return 0 - else: - self._print_error("Installation failed") - if result.error_message: - console.print(f"Error: {result.error_message}", style="red") - return 1 + + self._print_error("Installation failed") + if result.error_message: + console.print(result.error_message, style="red") + return 1 def _execute_multi_install(self, commands: list[dict[str, str]]) -> int: - """Execute multiple install commands.""" all_commands = [cmd["command"] for cmd in commands] all_descriptions = [cmd["description"] for cmd in commands] - def progress_callback(current: int, total: int, step: InstallationStep) -> None: - status_emoji = "⏳" - if step.status == StepStatus.SUCCESS: - status_emoji = "✅" - elif step.status == StepStatus.FAILED: - status_emoji = "❌" - console.print(f"\n[{current}/{total}] {status_emoji} {step.description}") - console.print(f" Command: {step.command}") - - coordinator = InstallationCoordinator( - commands=all_commands, - descriptions=all_descriptions, - timeout=600, - stop_on_error=True, - progress_callback=progress_callback, - ) + section("INSTALLING ALL DEPENDENCIES") + + with progress_bar() as progress: + task = progress.add_task("Installing", total=len(all_commands)) + + coordinator = InstallationCoordinator( + commands=all_commands, + descriptions=all_descriptions, + timeout=600, + stop_on_error=True, + progress_callback=lambda c, t, s: progress.advance(task, 1), + ) - console.print("\n[bold]Installing packages...[/bold]") - result = coordinator.execute() + result = coordinator.execute() if result.success: - self._print_success("\nAll packages installed successfully!") - console.print(f"Completed in {result.total_duration:.2f} seconds") + summary_box( + "ALL DEPENDENCIES INSTALLED", + [f"Steps executed: {len(all_commands)}"], + success=True, + ) return 0 + + if result.failed_step is not None: + self._print_error(f"Installation failed at step {result.failed_step + 1}") else: - if result.failed_step is not None: - self._print_error(f"\nInstallation failed at step {result.failed_step + 1}") - else: - self._print_error("\nInstallation failed") - if result.error_message: - console.print(f"Error: {result.error_message}", style="red") - return 1 + self._print_error("Installation failed") + + if result.error_message: + console.print(result.error_message, style="red") + return 1 # -------------------------- @@ -2011,40 +2272,43 @@ def show_rich_help(): for all core Cortex utilities including installation, environment management, and container tools. """ - from rich.table import Table - show_banner(show_version=True) console.print() - console.print("[bold]AI-powered package manager for Linux[/bold]") - console.print("[dim]Just tell Cortex what you want to install.[/dim]") - console.print() + section("CORTEX CLI") - # Initialize a table to display commands with specific column styling - table = Table(show_header=True, header_style="bold cyan", box=None) - table.add_column("Command", style="green") - table.add_column("Description") - - # Command Rows - table.add_row("ask ", "Ask about your system") - table.add_row("demo", "See Cortex in action") - table.add_row("wizard", "Configure API key") - table.add_row("status", "System status") - table.add_row("install ", "Install software") - table.add_row("import ", "Import deps from package files") - table.add_row("history", "View history") - table.add_row("rollback ", "Undo installation") - table.add_row("notify", "Manage desktop notifications") - table.add_row("env", "Manage environment variables") - table.add_row("cache stats", "Show LLM cache statistics") - table.add_row("stack ", "Install the stack") - table.add_row("docker permissions", "Fix Docker bind-mount permissions") - table.add_row("sandbox ", "Test packages in Docker sandbox") - table.add_row("doctor", "System health check") - - console.print(table) - console.print() - console.print("[dim]Learn more: https://cortexlinux.com/docs[/dim]") + status_box( + "OVERVIEW", + { + "Description": "AI-powered package manager for Linux", + "Usage": "Tell Cortex what you want to install", + }, + ) + + data_table( + columns=[ + {"name": "Command", "style": "green"}, + {"name": "Description"}, + ], + rows=[ + ["ask ", "Ask questions about your system"], + ["demo", "See Cortex in action"], + ["wizard", "Configure API key"], + ["status", "System status and health checks"], + ["install ", "Install software"], + ["import ", "Import dependencies from package files"], + ["history", "View installation history"], + ["rollback ", "Undo an installation"], + ["notify", "Manage desktop notifications"], + ["env", "Manage environment variables"], + ["cache stats", "Show LLM cache statistics"], + ["stack ", "Install a predefined stack"], + ["sandbox ", "Test packages in a Docker sandbox"], + ], + title="AVAILABLE COMMANDS", + ) + + info("Learn more: https://cortexlinux.com/", badge=True) def shell_suggest(text: str) -> int: @@ -2091,7 +2355,7 @@ def main(): except Exception as e: # Network config is optional - don't block execution if it fails - console.print(f"[yellow]⚠️ Network auto-config failed: {e}[/yellow]") + warning(f"Network auto-config failed: {e}") parser = argparse.ArgumentParser( prog="cortex", @@ -2105,32 +2369,11 @@ def main(): subparsers = parser.add_subparsers(dest="command", help="Available commands") - # Define the docker command and its associated sub-actions - docker_parser = subparsers.add_parser("docker", help="Docker and container utilities") - docker_subs = docker_parser.add_subparsers(dest="docker_action", help="Docker actions") - - # Add the permissions action to allow fixing file ownership issues - perm_parser = docker_subs.add_parser( - "permissions", help="Fix file permissions from bind mounts" - ) - - # Provide an option to skip the manual confirmation prompt - perm_parser.add_argument("--yes", "-y", action="store_true", help="Skip confirmation prompt") + subparsers.add_parser("demo", help="See Cortex in action") + subparsers.add_parser("wizard", help="Configure API key interactively") - perm_parser.add_argument( - "--execute", "-e", action="store_true", help="Apply ownership changes (default: dry-run)" - ) - - # Demo command - demo_parser = subparsers.add_parser("demo", help="See Cortex in action") - - # Wizard command - wizard_parser = subparsers.add_parser("wizard", help="Configure API key interactively") - - # Status command (includes comprehensive health checks) subparsers.add_parser("status", help="Show comprehensive system status and health checks") - # Ask command ask_parser = subparsers.add_parser("ask", help="Ask a question about your system") ask_parser.add_argument("question", type=str, help="Natural language question") @@ -2139,40 +2382,14 @@ def main(): install_parser.add_argument("software", type=str, help="Software to install") install_parser.add_argument("--execute", action="store_true", help="Execute commands") install_parser.add_argument("--dry-run", action="store_true", help="Show commands only") - install_parser.add_argument( - "--parallel", - action="store_true", - help="Enable parallel execution for multi-step installs", - ) + install_parser.add_argument("--parallel", action="store_true", help="Enable parallel execution") # Import command - import dependencies from package manager files - import_parser = subparsers.add_parser( - "import", - help="Import and install dependencies from package files", - ) - import_parser.add_argument( - "file", - nargs="?", - help="Dependency file (requirements.txt, package.json, Gemfile, Cargo.toml, go.mod)", - ) - import_parser.add_argument( - "--all", - "-a", - action="store_true", - help="Scan directory for all dependency files", - ) - import_parser.add_argument( - "--execute", - "-e", - action="store_true", - help="Execute install commands (default: dry-run)", - ) - import_parser.add_argument( - "--dev", - "-d", - action="store_true", - help="Include dev dependencies", - ) + import_parser = subparsers.add_parser("import", help="Import dependencies from package files") + import_parser.add_argument("file", nargs="?") + import_parser.add_argument("--all", "-a", action="store_true") + import_parser.add_argument("--execute", "-e", action="store_true") + import_parser.add_argument("--dev", "-d", action="store_true") # History command history_parser = subparsers.add_parser("history", help="View history") @@ -2182,301 +2399,153 @@ def main(): # Rollback command rollback_parser = subparsers.add_parser("rollback", help="Rollback installation") - rollback_parser.add_argument("id", help="Installation ID") + rollback_parser.add_argument("id") rollback_parser.add_argument("--dry-run", action="store_true") # --- New Notify Command --- notify_parser = subparsers.add_parser("notify", help="Manage desktop notifications") - notify_subs = notify_parser.add_subparsers(dest="notify_action", help="Notify actions") + notify_subs = notify_parser.add_subparsers(dest="notify_action") - notify_subs.add_parser("config", help="Show configuration") - notify_subs.add_parser("enable", help="Enable notifications") - notify_subs.add_parser("disable", help="Disable notifications") + notify_subs.add_parser("config") + notify_subs.add_parser("enable") + notify_subs.add_parser("disable") - dnd_parser = notify_subs.add_parser("dnd", help="Configure DND window") - dnd_parser.add_argument("start", help="Start time (HH:MM)") - dnd_parser.add_argument("end", help="End time (HH:MM)") + dnd_parser = notify_subs.add_parser("dnd") + dnd_parser.add_argument("start") + dnd_parser.add_argument("end") - send_parser = notify_subs.add_parser("send", help="Send test notification") - send_parser.add_argument("message", help="Notification message") + send_parser = notify_subs.add_parser("send") + send_parser.add_argument("message") send_parser.add_argument("--title", default="Cortex Notification") send_parser.add_argument("--level", choices=["low", "normal", "critical"], default="normal") - send_parser.add_argument("--actions", nargs="*", help="Action buttons") - # -------------------------- + send_parser.add_argument("--actions", nargs="*") # Stack command stack_parser = subparsers.add_parser("stack", help="Manage pre-built package stacks") - stack_parser.add_argument( - "name", nargs="?", help="Stack name to install (ml, ml-cpu, webdev, devops, data)" - ) + stack_parser.add_argument("name", nargs="?") stack_group = stack_parser.add_mutually_exclusive_group() - stack_group.add_argument("--list", "-l", action="store_true", help="List all available stacks") - stack_group.add_argument("--describe", "-d", metavar="STACK", help="Show details about a stack") - stack_parser.add_argument( - "--dry-run", action="store_true", help="Show what would be installed (requires stack name)" - ) + stack_group.add_argument("--list", "-l", action="store_true") + stack_group.add_argument("--describe", "-d") + stack_parser.add_argument("--dry-run", action="store_true") + # Cache commands cache_parser = subparsers.add_parser("cache", help="Cache operations") - cache_subs = cache_parser.add_subparsers(dest="cache_action", help="Cache actions") - cache_subs.add_parser("stats", help="Show cache statistics") + cache_subs = cache_parser.add_subparsers(dest="cache_action") + cache_subs.add_parser("stats") # --- Sandbox Commands (Docker-based package testing) --- - sandbox_parser = subparsers.add_parser( - "sandbox", help="Test packages in isolated Docker sandbox" - ) - sandbox_subs = sandbox_parser.add_subparsers(dest="sandbox_action", help="Sandbox actions") + sandbox_parser = subparsers.add_parser("sandbox", help="Test packages in Docker sandbox") + sandbox_subs = sandbox_parser.add_subparsers(dest="sandbox_action") - # sandbox create [--image IMAGE] - sandbox_create_parser = sandbox_subs.add_parser("create", help="Create a sandbox environment") - sandbox_create_parser.add_argument("name", help="Unique name for the sandbox") - sandbox_create_parser.add_argument( - "--image", default="ubuntu:22.04", help="Docker image to use (default: ubuntu:22.04)" - ) + sandbox_create_parser = sandbox_subs.add_parser("create") + sandbox_create_parser.add_argument("name") + sandbox_create_parser.add_argument("--image", default="ubuntu:22.04") # sandbox install - sandbox_install_parser = sandbox_subs.add_parser("install", help="Install a package in sandbox") - sandbox_install_parser.add_argument("name", help="Sandbox name") - sandbox_install_parser.add_argument("package", help="Package to install") + sandbox_install_parser = sandbox_subs.add_parser("install") + sandbox_install_parser.add_argument("name") + sandbox_install_parser.add_argument("package") # sandbox test [package] - sandbox_test_parser = sandbox_subs.add_parser("test", help="Run tests in sandbox") - sandbox_test_parser.add_argument("name", help="Sandbox name") - sandbox_test_parser.add_argument("package", nargs="?", help="Specific package to test") + sandbox_test_parser = sandbox_subs.add_parser("test") + sandbox_test_parser.add_argument("name") + sandbox_test_parser.add_argument("package", nargs="?") # sandbox promote [--dry-run] - sandbox_promote_parser = sandbox_subs.add_parser( - "promote", help="Install tested package on main system" - ) - sandbox_promote_parser.add_argument("name", help="Sandbox name") - sandbox_promote_parser.add_argument("package", help="Package to promote") - sandbox_promote_parser.add_argument( - "--dry-run", action="store_true", help="Show command without executing" - ) - sandbox_promote_parser.add_argument( - "-y", "--yes", action="store_true", help="Skip confirmation prompt" - ) + sandbox_promote_parser = sandbox_subs.add_parser("promote") + sandbox_promote_parser.add_argument("name") + sandbox_promote_parser.add_argument("package") + sandbox_promote_parser.add_argument("--dry-run", action="store_true") + sandbox_promote_parser.add_argument("-y", "--yes", action="store_true") # sandbox cleanup [--force] - sandbox_cleanup_parser = sandbox_subs.add_parser("cleanup", help="Remove a sandbox environment") - sandbox_cleanup_parser.add_argument("name", help="Sandbox name to remove") - sandbox_cleanup_parser.add_argument("-f", "--force", action="store_true", help="Force removal") + sandbox_cleanup_parser = sandbox_subs.add_parser("cleanup") + sandbox_cleanup_parser.add_argument("name") + sandbox_cleanup_parser.add_argument("-f", "--force", action="store_true") # sandbox list - sandbox_subs.add_parser("list", help="List all sandbox environments") + sandbox_subs.add_parser("list") # sandbox exec - sandbox_exec_parser = sandbox_subs.add_parser("exec", help="Execute command in sandbox") - sandbox_exec_parser.add_argument("name", help="Sandbox name") - sandbox_exec_parser.add_argument("cmd", nargs="+", help="Command to execute") - # -------------------------- + sandbox_exec_parser = sandbox_subs.add_parser("exec") + sandbox_exec_parser.add_argument("name") + sandbox_exec_parser.add_argument("cmd", nargs="+") # maybe needs update here # --- Environment Variable Management Commands --- env_parser = subparsers.add_parser("env", help="Manage environment variables") - env_subs = env_parser.add_subparsers(dest="env_action", help="Environment actions") + env_subs = env_parser.add_subparsers(dest="env_action") # env set [--encrypt] [--type TYPE] [--description DESC] - env_set_parser = env_subs.add_parser("set", help="Set an environment variable") - env_set_parser.add_argument("app", help="Application name") - env_set_parser.add_argument("key", help="Variable name") - env_set_parser.add_argument("value", help="Variable value") - env_set_parser.add_argument("--encrypt", "-e", action="store_true", help="Encrypt the value") + env_set_parser = env_subs.add_parser("set") + env_set_parser.add_argument("app") + env_set_parser.add_argument("key") + env_set_parser.add_argument("value") + env_set_parser.add_argument("--encrypt", "-e", action="store_true") env_set_parser.add_argument( "--type", "-t", choices=["string", "url", "port", "boolean", "integer", "path"], default="string", - help="Variable type for validation", ) - env_set_parser.add_argument("--description", "-d", help="Description of the variable") + env_set_parser.add_argument("--description", "-d") # env get [--decrypt] - env_get_parser = env_subs.add_parser("get", help="Get an environment variable") - env_get_parser.add_argument("app", help="Application name") - env_get_parser.add_argument("key", help="Variable name") - env_get_parser.add_argument( - "--decrypt", action="store_true", help="Decrypt and show encrypted values" - ) + env_get_parser = env_subs.add_parser("get") + env_get_parser.add_argument("app") + env_get_parser.add_argument("key") + env_get_parser.add_argument("--decrypt", action="store_true") # env list [--decrypt] - env_list_parser = env_subs.add_parser("list", help="List environment variables") - env_list_parser.add_argument("app", help="Application name") - env_list_parser.add_argument( - "--decrypt", action="store_true", help="Decrypt and show encrypted values" - ) + env_list_parser = env_subs.add_parser("list") + env_list_parser.add_argument("app") + env_list_parser.add_argument("--decrypt", action="store_true") # env delete - env_delete_parser = env_subs.add_parser("delete", help="Delete an environment variable") - env_delete_parser.add_argument("app", help="Application name") - env_delete_parser.add_argument("key", help="Variable name") + env_delete_parser = env_subs.add_parser("delete") + env_delete_parser.add_argument("app") + env_delete_parser.add_argument("key") # env export [--include-encrypted] [--output FILE] - env_export_parser = env_subs.add_parser("export", help="Export variables to .env format") - env_export_parser.add_argument("app", help="Application name") - env_export_parser.add_argument( - "--include-encrypted", - action="store_true", - help="Include decrypted values of encrypted variables", - ) - env_export_parser.add_argument("--output", "-o", help="Output file (default: stdout)") + env_export_parser = env_subs.add_parser("export") + env_export_parser.add_argument("app") + env_export_parser.add_argument("--include-encrypted", action="store_true") + env_export_parser.add_argument("--output", "-o") # env import [file] [--encrypt-keys KEYS] - env_import_parser = env_subs.add_parser("import", help="Import variables from .env format") - env_import_parser.add_argument("app", help="Application name") - env_import_parser.add_argument("file", nargs="?", help="Input file (default: stdin)") - env_import_parser.add_argument("--encrypt-keys", help="Comma-separated list of keys to encrypt") + env_import_parser = env_subs.add_parser("import") + env_import_parser.add_argument("app") + env_import_parser.add_argument("file", nargs="?") + env_import_parser.add_argument("--encrypt-keys") # env clear [--force] - env_clear_parser = env_subs.add_parser("clear", help="Clear all variables for an app") - env_clear_parser.add_argument("app", help="Application name") - env_clear_parser.add_argument("--force", "-f", action="store_true", help="Skip confirmation") + env_clear_parser = env_subs.add_parser("clear") + env_clear_parser.add_argument("app") + env_clear_parser.add_argument("--force", "-f", action="store_true") # env apps - list all apps with environments - env_subs.add_parser("apps", help="List all apps with stored environments") + env_subs.add_parser("apps") # env load - load into os.environ - env_load_parser = env_subs.add_parser("load", help="Load variables into current environment") - env_load_parser.add_argument("app", help="Application name") + env_load_parser = env_subs.add_parser("load") + env_load_parser.add_argument("app") # env template subcommands - env_template_parser = env_subs.add_parser("template", help="Manage environment templates") - env_template_subs = env_template_parser.add_subparsers( - dest="template_action", help="Template actions" - ) + env_template_parser = env_subs.add_parser("template") + env_template_subs = env_template_parser.add_subparsers(dest="template_action") # env template list - env_template_subs.add_parser("list", help="List available templates") - + env_template_subs.add_parser("list") # env template show - env_template_show_parser = env_template_subs.add_parser("show", help="Show template details") - env_template_show_parser.add_argument("template_name", help="Template name") + env_template_show_parser = env_template_subs.add_parser("show") + env_template_show_parser.add_argument("template_name") # env template apply