diff --git a/cortex/cli.py b/cortex/cli.py index 939f66ef..574ce584 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -10,6 +10,7 @@ from cortex.branding import VERSION, console, cx_header, cx_print, show_banner from cortex.coordinator import InstallationCoordinator, StepStatus from cortex.demo import run_demo +from cortex.env_manager import EnvironmentManager, get_env_manager from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType from cortex.llm.interpreter import CommandInterpreter from cortex.notification_manager import NotificationManager @@ -488,17 +489,28 @@ def parallel_log_callback(message: str, level: str = "info"): print(f" View details: cortex history show {install_id}") return 1 - except Exception as e: + except (ValueError, OSError) as e: if install_id: history.update_installation( install_id, InstallationStatus.FAILED, str(e) ) self._print_error(f"Parallel execution failed: {str(e)}") return 1 + except Exception as e: + if install_id: + history.update_installation( + install_id, InstallationStatus.FAILED, str(e) + ) + self._print_error(f"Unexpected parallel execution error: {str(e)}") + if self.verbose: + import traceback + + traceback.print_exc() + return 1 coordinator = InstallationCoordinator( commands=commands, - descriptions=[f"Step {i+1}" for i in range(len(commands))], + descriptions=[f"Step {i + 1}" for i in range(len(commands))], timeout=300, stop_on_error=True, progress_callback=progress_callback, @@ -551,10 +563,19 @@ def parallel_log_callback(message: str, level: str = "info"): history.update_installation(install_id, InstallationStatus.FAILED, str(e)) self._print_error(f"API call failed: {str(e)}") return 1 + except OSError as e: + if install_id: + history.update_installation(install_id, InstallationStatus.FAILED, str(e)) + self._print_error(f"System error: {str(e)}") + return 1 except Exception as e: if install_id: history.update_installation(install_id, InstallationStatus.FAILED, str(e)) self._print_error(f"Unexpected error: {str(e)}") + if self.verbose: + import traceback + + traceback.print_exc() return 1 def cache_stats(self) -> int: @@ -571,9 +592,16 @@ def cache_stats(self) -> int: cx_print(f"Hit rate: {hit_rate}", "info") cx_print(f"Saved calls (approx): {stats.hits}", "info") return 0 - except Exception as e: + except (ImportError, OSError) as e: self._print_error(f"Unable to read cache stats: {e}") return 1 + except Exception as e: + self._print_error(f"Unexpected error reading cache stats: {e}") + if self.verbose: + import traceback + + traceback.print_exc() + return 1 def history(self, limit: int = 20, status: str | None = None, show_id: str | None = None): """Show installation history""" @@ -627,16 +655,23 @@ def history(self, limit: int = 20, status: str | None = None, show_id: str | Non date = r.timestamp[:19].replace("T", " ") packages = ", ".join(r.packages[:2]) if len(r.packages) > 2: - packages += f" +{len(r.packages)-2}" + packages += f" +{len(r.packages) - 2}" print( f"{r.id:<18} {date:<20} {r.operation_type.value:<12} {packages:<30} {r.status.value:<15}" ) return 0 - except Exception as e: + except (ValueError, OSError) as e: self._print_error(f"Failed to retrieve history: {str(e)}") return 1 + except Exception as e: + self._print_error(f"Unexpected error retrieving history: {str(e)}") + if self.verbose: + import traceback + + traceback.print_exc() + return 1 def rollback(self, install_id: str, dry_run: bool = False): """Rollback an installation""" @@ -655,9 +690,16 @@ def rollback(self, install_id: str, dry_run: bool = False): else: self._print_error(message) return 1 - except Exception as e: + except (ValueError, OSError) as e: self._print_error(f"Rollback failed: {str(e)}") return 1 + except Exception as e: + self._print_error(f"Unexpected rollback error: {str(e)}") + if self.verbose: + import traceback + + traceback.print_exc() + return 1 def _get_prefs_manager(self): """Lazy initialize preferences manager""" @@ -684,9 +726,16 @@ def check_pref(self, key: str | None = None): print_all_preferences(manager) return 0 - except Exception as e: + except (ValueError, OSError) as e: self._print_error(f"Failed to read preferences: {str(e)}") return 1 + except Exception as e: + self._print_error(f"Unexpected error reading preferences: {str(e)}") + if self.verbose: + import traceback + + traceback.print_exc() + return 1 def edit_pref(self, action: str, key: str | None = None, value: str | None = None): """Edit user preferences (add/set, delete/remove, list)""" @@ -733,9 +782,16 @@ def edit_pref(self, action: str, key: str | None = None, value: str | None = Non self._print_error(f"Unknown action: {action}") return 1 - except Exception as e: + except (ValueError, OSError) as e: self._print_error(f"Failed to edit preferences: {str(e)}") return 1 + except Exception as e: + self._print_error(f"Unexpected error editing preferences: {str(e)}") + if self.verbose: + import traceback + + traceback.print_exc() + return 1 def status(self): """Show system status including security features""" @@ -780,6 +836,364 @@ def wizard(self): cx_print("Please export your API key in your shell profile.", "info") return 0 + def env(self, args: argparse.Namespace) -> int: + """Handle environment variable management commands.""" + env_mgr = get_env_manager() + + # Handle subcommand routing + action = getattr(args, "env_action", None) + + if not action: + self._print_error( + "Please specify a subcommand (set/get/list/delete/export/import/clear/template)" + ) + return 1 + + try: + if action == "set": + return self._env_set(env_mgr, args) + elif action == "get": + return self._env_get(env_mgr, args) + elif action == "list": + return self._env_list(env_mgr, args) + elif action == "delete": + return self._env_delete(env_mgr, args) + elif action == "export": + return self._env_export(env_mgr, args) + elif action == "import": + return self._env_import(env_mgr, args) + elif action == "clear": + return self._env_clear(env_mgr, args) + elif action == "template": + return self._env_template(env_mgr, args) + elif action == "apps": + return self._env_list_apps(env_mgr, args) + elif action == "load": + return self._env_load(env_mgr, args) + else: + self._print_error(f"Unknown env subcommand: {action}") + return 1 + except (ValueError, OSError) as e: + self._print_error(f"Environment operation failed: {e}") + return 1 + except Exception as e: + self._print_error(f"Unexpected error: {e}") + if self.verbose: + import traceback + + traceback.print_exc() + return 1 + + def _env_set(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """Set an environment variable.""" + app = args.app + key = args.key + value = args.value + encrypt = getattr(args, "encrypt", False) + var_type = getattr(args, "type", "string") or "string" + description = getattr(args, "description", "") or "" + + try: + env_mgr.set_variable( + app=app, + key=key, + value=value, + encrypt=encrypt, + var_type=var_type, + description=description, + ) + + if encrypt: + cx_print("πŸ” Variable encrypted and stored", "success") + else: + cx_print("βœ“ Environment variable set", "success") + return 0 + + except ValueError as e: + self._print_error(str(e)) + return 1 + except ImportError as e: + self._print_error(str(e)) + if "cryptography" in str(e).lower(): + cx_print("Install with: pip install cryptography", "info") + return 1 + + def _env_get(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """Get an environment variable value.""" + app = args.app + key = args.key + show_encrypted = getattr(args, "decrypt", False) + + value = env_mgr.get_variable(app, key, decrypt=show_encrypted) + + if value is None: + self._print_error(f"Variable '{key}' not found for app '{app}'") + return 1 + + var_info = env_mgr.get_variable_info(app, key) + + if var_info and var_info.encrypted and not show_encrypted: + console.print(f"{key}: [dim][encrypted][/dim]") + else: + console.print(f"{key}: {value}") + + return 0 + + def _env_list(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """List all environment variables for an app.""" + app = args.app + show_encrypted = getattr(args, "decrypt", False) + + variables = env_mgr.list_variables(app) + + if not variables: + cx_print(f"No environment variables set for '{app}'", "info") + return 0 + + cx_header(f"Environment: {app}") + + for var in sorted(variables, key=lambda v: v.key): + if var.encrypted: + if show_encrypted: + try: + value = env_mgr.get_variable(app, var.key, decrypt=True) + console.print(f" {var.key}: {value} [dim](decrypted)[/dim]") + except ValueError: + console.print(f" {var.key}: [red][decryption failed][/red]") + else: + console.print(f" {var.key}: [yellow][encrypted][/yellow]") + else: + console.print(f" {var.key}: {var.value}") + + if var.description: + console.print(f" [dim]# {var.description}[/dim]") + + console.print() + console.print(f"[dim]Total: {len(variables)} variable(s)[/dim]") + return 0 + + def _env_delete(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """Delete an environment variable.""" + app = args.app + key = args.key + + if env_mgr.delete_variable(app, key): + cx_print(f"βœ“ Deleted '{key}' from '{app}'", "success") + return 0 + else: + self._print_error(f"Variable '{key}' not found for app '{app}'") + return 1 + + def _env_export(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """Export environment variables to .env format.""" + app = args.app + include_encrypted = getattr(args, "include_encrypted", False) + output_file = getattr(args, "output", None) + + content = env_mgr.export_env(app, include_encrypted=include_encrypted) + + if not content: + cx_print(f"No environment variables to export for '{app}'", "info") + return 0 + + if output_file: + try: + with open(output_file, "w", encoding="utf-8") as f: + f.write(content) + cx_print(f"βœ“ Exported to {output_file}", "success") + except OSError as e: + self._print_error(f"Failed to write file: {e}") + return 1 + else: + # Print to stdout + print(content, end="") + + return 0 + + def _env_import(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """Import environment variables from .env format.""" + import sys + + app = args.app + input_file = getattr(args, "file", None) + encrypt_keys = getattr(args, "encrypt_keys", None) + + try: + if input_file: + with open(input_file, encoding="utf-8") as f: + content = f.read() + elif not sys.stdin.isatty(): + content = sys.stdin.read() + else: + self._print_error("No input file specified and stdin is empty") + cx_print("Usage: cortex env import ", "info") + cx_print(" or: cat .env | cortex env import ", "info") + return 1 + + # Parse encrypt-keys argument + encrypt_list = [] + if encrypt_keys: + encrypt_list = [k.strip() for k in encrypt_keys.split(",")] + + count, errors = env_mgr.import_env(app, content, encrypt_keys=encrypt_list) + + if errors: + for err in errors: + cx_print(f" ⚠ {err}", "warning") + + if count > 0: + cx_print(f"βœ“ Imported {count} variable(s) to '{app}'", "success") + else: + cx_print("No variables imported", "info") + + # Return success (0) even with partial errors - some vars imported successfully + return 0 + + except FileNotFoundError: + self._print_error(f"File not found: {input_file}") + return 1 + except OSError as e: + self._print_error(f"Failed to read file: {e}") + return 1 + + def _env_clear(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """Clear all environment variables for an app.""" + app = args.app + force = getattr(args, "force", False) + + # Confirm unless --force is used + if not force: + confirm = input(f"⚠️ Clear ALL environment variables for '{app}'? (y/n): ") + if confirm.lower() != "y": + cx_print("Operation cancelled", "info") + return 0 + + if env_mgr.clear_app(app): + cx_print(f"βœ“ Cleared all variables for '{app}'", "success") + else: + cx_print(f"No environment data found for '{app}'", "info") + + return 0 + + def _env_template(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """Handle template subcommands.""" + template_action = getattr(args, "template_action", None) + + if template_action == "list": + return self._env_template_list(env_mgr) + elif template_action == "show": + return self._env_template_show(env_mgr, args) + elif template_action == "apply": + return self._env_template_apply(env_mgr, args) + else: + self._print_error( + "Please specify: template list, template show , or template apply " + ) + return 1 + + def _env_template_list(self, env_mgr: EnvironmentManager) -> int: + """List available templates.""" + templates = env_mgr.list_templates() + + cx_header("Available Environment Templates") + + for template in sorted(templates, key=lambda t: t.name): + console.print(f" [green]{template.name}[/green]") + console.print(f" {template.description}") + console.print(f" [dim]{len(template.variables)} variables[/dim]") + console.print() + + cx_print("Use 'cortex env template show ' for details", "info") + return 0 + + def _env_template_show(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """Show template details.""" + template_name = args.template_name + + template = env_mgr.get_template(template_name) + if not template: + self._print_error(f"Template '{template_name}' not found") + return 1 + + cx_header(f"Template: {template.name}") + console.print(f" {template.description}") + console.print() + + console.print("[bold]Variables:[/bold]") + for var in template.variables: + req = "[red]*[/red]" if var.required else " " + default = f" = {var.default}" if var.default else "" + console.print(f" {req} [cyan]{var.name}[/cyan] ({var.var_type}){default}") + if var.description: + console.print(f" [dim]{var.description}[/dim]") + + console.print() + console.print("[dim]* = required[/dim]") + return 0 + + def _env_template_apply(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """Apply a template to an app.""" + template_name = args.template_name + app = args.app + + # Parse key=value pairs from args + values = {} + value_args = getattr(args, "values", []) or [] + for val in value_args: + if "=" in val: + k, v = val.split("=", 1) + values[k] = v + + # Parse encrypt keys + encrypt_keys = [] + encrypt_arg = getattr(args, "encrypt_keys", None) + if encrypt_arg: + encrypt_keys = [k.strip() for k in encrypt_arg.split(",")] + + result = env_mgr.apply_template( + template_name=template_name, + app=app, + values=values, + encrypt_keys=encrypt_keys, + ) + + if result.valid: + cx_print(f"βœ“ Applied template '{template_name}' to '{app}'", "success") + return 0 + else: + self._print_error(f"Failed to apply template '{template_name}'") + for err in result.errors: + console.print(f" [red]βœ—[/red] {err}") + return 1 + + def _env_list_apps(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """List all apps with stored environments.""" + apps = env_mgr.list_apps() + + if not apps: + cx_print("No applications with stored environments", "info") + return 0 + + cx_header("Applications with Environments") + for app in apps: + var_count = len(env_mgr.list_variables(app)) + console.print(f" [green]{app}[/green] [dim]({var_count} variables)[/dim]") + + return 0 + + def _env_load(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> int: + """Load environment variables into current process.""" + app = args.app + + count = env_mgr.load_to_environ(app) + + if count > 0: + cx_print(f"βœ“ Loaded {count} variable(s) from '{app}' into environment", "success") + else: + cx_print(f"No variables to load for '{app}'", "info") + + return 0 + def show_rich_help(): """Display beautifully formatted help using Rich""" @@ -805,6 +1219,7 @@ def show_rich_help(): table.add_row("history", "View history") table.add_row("rollback ", "Undo installation") table.add_row("notify", "Manage desktop notifications") + table.add_row("env", "Manage environment variables") table.add_row("cache stats", "Show LLM cache statistics") table.add_row("stack ", "Install the stack") table.add_row("doctor", "System health check") @@ -934,6 +1349,98 @@ def main(): cache_subs = cache_parser.add_subparsers(dest="cache_action", help="Cache actions") cache_subs.add_parser("stats", help="Show cache statistics") + # --- Environment Variable Management Commands --- + env_parser = subparsers.add_parser("env", help="Manage environment variables") + env_subs = env_parser.add_subparsers(dest="env_action", help="Environment actions") + + # env set [--encrypt] [--type TYPE] [--description DESC] + env_set_parser = env_subs.add_parser("set", help="Set an environment variable") + env_set_parser.add_argument("app", help="Application name") + env_set_parser.add_argument("key", help="Variable name") + env_set_parser.add_argument("value", help="Variable value") + env_set_parser.add_argument("--encrypt", "-e", action="store_true", help="Encrypt the value") + env_set_parser.add_argument( + "--type", + "-t", + choices=["string", "url", "port", "boolean", "integer", "path"], + default="string", + help="Variable type for validation", + ) + env_set_parser.add_argument("--description", "-d", help="Description of the variable") + + # env get [--decrypt] + env_get_parser = env_subs.add_parser("get", help="Get an environment variable") + env_get_parser.add_argument("app", help="Application name") + env_get_parser.add_argument("key", help="Variable name") + env_get_parser.add_argument( + "--decrypt", action="store_true", help="Decrypt and show encrypted values" + ) + + # env list [--decrypt] + env_list_parser = env_subs.add_parser("list", help="List environment variables") + env_list_parser.add_argument("app", help="Application name") + env_list_parser.add_argument( + "--decrypt", action="store_true", help="Decrypt and show encrypted values" + ) + + # env delete + env_delete_parser = env_subs.add_parser("delete", help="Delete an environment variable") + env_delete_parser.add_argument("app", help="Application name") + env_delete_parser.add_argument("key", help="Variable name") + + # env export [--include-encrypted] [--output FILE] + env_export_parser = env_subs.add_parser("export", help="Export variables to .env format") + env_export_parser.add_argument("app", help="Application name") + env_export_parser.add_argument( + "--include-encrypted", + action="store_true", + help="Include decrypted values of encrypted variables", + ) + env_export_parser.add_argument("--output", "-o", help="Output file (default: stdout)") + + # env import [file] [--encrypt-keys KEYS] + env_import_parser = env_subs.add_parser("import", help="Import variables from .env format") + env_import_parser.add_argument("app", help="Application name") + env_import_parser.add_argument("file", nargs="?", help="Input file (default: stdin)") + env_import_parser.add_argument("--encrypt-keys", help="Comma-separated list of keys to encrypt") + + # env clear [--force] + env_clear_parser = env_subs.add_parser("clear", help="Clear all variables for an app") + env_clear_parser.add_argument("app", help="Application name") + env_clear_parser.add_argument("--force", "-f", action="store_true", help="Skip confirmation") + + # env apps - list all apps with environments + env_subs.add_parser("apps", help="List all apps with stored environments") + + # env load - load into os.environ + env_load_parser = env_subs.add_parser("load", help="Load variables into current environment") + env_load_parser.add_argument("app", help="Application name") + + # env template subcommands + env_template_parser = env_subs.add_parser("template", help="Manage environment templates") + env_template_subs = env_template_parser.add_subparsers( + dest="template_action", help="Template actions" + ) + + # env template list + env_template_subs.add_parser("list", help="List available templates") + + # env template show + env_template_show_parser = env_template_subs.add_parser("show", help="Show template details") + env_template_show_parser.add_argument("template_name", help="Template name") + + # env template apply