-
-
Notifications
You must be signed in to change notification settings - Fork 52
[pin] Implement package version pinning system #627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -26,6 +26,13 @@ | |||||
| from cortex.llm.interpreter import CommandInterpreter | ||||||
| from cortex.network_config import NetworkConfig | ||||||
| from cortex.notification_manager import NotificationManager | ||||||
| from cortex.pin_manager import ( | ||||||
| PackageSource, | ||||||
| PinManager, | ||||||
| PinType, | ||||||
| get_pin_manager, | ||||||
| parse_package_spec, | ||||||
| ) | ||||||
| from cortex.role_manager import RoleManager | ||||||
| from cortex.stack_manager import StackManager | ||||||
| from cortex.uninstall_impact import ( | ||||||
|
|
@@ -817,6 +824,7 @@ def install( | |||||
| execute: bool = False, | ||||||
| dry_run: bool = False, | ||||||
| parallel: bool = False, | ||||||
| force: bool = False, | ||||||
| ): | ||||||
| # Validate input first | ||||||
| is_valid, error = validate_install_request(software) | ||||||
|
|
@@ -872,6 +880,54 @@ def install( | |||||
| # Extract packages from commands for tracking | ||||||
| packages = history._extract_packages_from_commands(commands) | ||||||
|
|
||||||
| # Check for pinned packages | ||||||
| pin_mgr = get_pin_manager() | ||||||
| pinned_packages = [] | ||||||
| blocked_packages = [] | ||||||
|
|
||||||
| for pkg in packages: | ||||||
| pin = pin_mgr.get_pin(pkg) | ||||||
| if pin: | ||||||
| # Check if this is an update that would violate the pin | ||||||
| # For now, we'll warn about any pinned package in the install list | ||||||
| # In a more sophisticated implementation, we'd check the actual version | ||||||
| if not force: | ||||||
| blocked_packages.append((pkg, pin)) | ||||||
| else: | ||||||
| pinned_packages.append((pkg, pin)) | ||||||
|
|
||||||
| # Show warnings for pinned packages | ||||||
| if blocked_packages: | ||||||
| console.print() | ||||||
| cx_print("⚠️ Pinned packages detected:", "warning") | ||||||
| for pkg, pin in blocked_packages: | ||||||
| console.print(f" • {pkg} (pinned to {pin.format_version_display()})") | ||||||
| console.print() | ||||||
| cx_print("Use --force to override pins", "info") | ||||||
| console.print("Example: cortex install <package> --execute --force") | ||||||
| return 1 | ||||||
|
|
||||||
| if pinned_packages: | ||||||
| console.print() | ||||||
| cx_print("⚠️ Pinned packages detected:", "warning") | ||||||
| for pkg, pin in pinned_packages: | ||||||
| console.print(f" • {pkg} (pinned to {pin.format_version_display()})") | ||||||
| console.print() | ||||||
|
|
||||||
| if force and execute: | ||||||
| # Confirm override | ||||||
| try: | ||||||
| response = console.input( | ||||||
| "[bold yellow]Package is pinned. Override? (y/N): [/bold yellow]" | ||||||
| ) | ||||||
| if response.lower() not in ("y", "yes"): | ||||||
| cx_print("Operation cancelled", "info") | ||||||
| return 0 | ||||||
| except (EOFError, KeyboardInterrupt): | ||||||
| console.print() | ||||||
| cx_print("Operation cancelled", "info") | ||||||
| return 0 | ||||||
|
|
||||||
| # Record installation start | ||||||
| if execute or dry_run: | ||||||
| install_id = history.record_installation( | ||||||
|
|
@@ -2825,6 +2881,208 @@ def progress_callback(current: int, total: int, step: InstallationStep) -> None: | |||||
| console.print(f"Error: {result.error_message}", style="red") | ||||||
| return 1 | ||||||
|
|
||||||
| # --- Pin Management Commands --- | ||||||
| def pin(self, args: argparse.Namespace) -> int: | ||||||
| """Handle package version pinning commands""" | ||||||
| action = getattr(args, "pin_action", None) | ||||||
|
|
||||||
| if not action: | ||||||
| self._print_error("Please specify a subcommand (add/remove/list/export/import/clear)") | ||||||
| return 1 | ||||||
|
|
||||||
| pin_mgr = get_pin_manager() | ||||||
|
|
||||||
| try: | ||||||
| if action == "add": | ||||||
| return self._pin_add(pin_mgr, args) | ||||||
| elif action == "remove": | ||||||
| return self._pin_remove(pin_mgr, args) | ||||||
| elif action == "list": | ||||||
| return self._pin_list(pin_mgr, args) | ||||||
| elif action == "export": | ||||||
| return self._pin_export(pin_mgr, args) | ||||||
| elif action == "import": | ||||||
| return self._pin_import(pin_mgr, args) | ||||||
| elif action == "clear": | ||||||
| return self._pin_clear(pin_mgr, args) | ||||||
| else: | ||||||
| self._print_error(f"Unknown pin action: {action}") | ||||||
| return 1 | ||||||
| except Exception as e: | ||||||
| self._print_error(f"Pin operation failed: {e}") | ||||||
| if self.verbose: | ||||||
| import traceback | ||||||
|
|
||||||
| traceback.print_exc() | ||||||
| return 1 | ||||||
|
|
||||||
| def _pin_add(self, pin_mgr: PinManager, args: argparse.Namespace) -> int: | ||||||
| """Add a package pin""" | ||||||
| spec = args.package_spec | ||||||
| reason = getattr(args, "reason", None) | ||||||
| pin_type_str = getattr(args, "type", None) | ||||||
| source_str = getattr(args, "source", None) | ||||||
| sync_apt = getattr(args, "sync_apt", False) | ||||||
|
|
||||||
| # Parse package spec (e.g., "postgresql@14.10" or "nginx") | ||||||
| package, version = parse_package_spec(spec) | ||||||
|
|
||||||
| if not version: | ||||||
| self._print_error( | ||||||
| "Version required. Use format: package@version (e.g., postgresql@14.10)" | ||||||
| ) | ||||||
| return 1 | ||||||
|
|
||||||
| # Detect pin type if not specified | ||||||
| if pin_type_str: | ||||||
| try: | ||||||
| pin_type = PinType(pin_type_str) | ||||||
| except ValueError: | ||||||
| self._print_error( | ||||||
| f"Invalid pin type: {pin_type_str}. Use: exact, minor, major, range" | ||||||
| ) | ||||||
| return 1 | ||||||
| else: | ||||||
| # Auto-detect from version string | ||||||
| if "," in version or any(op in version for op in [">=", "<=", ">", "<", "!="]): | ||||||
| pin_type = PinType.RANGE | ||||||
| elif version.endswith(".*") or "*" in version: | ||||||
| pin_type = PinType.MINOR | ||||||
| else: | ||||||
| pin_type = PinType.EXACT | ||||||
|
|
||||||
| # Parse source | ||||||
| if source_str: | ||||||
| try: | ||||||
| source = PackageSource(source_str) | ||||||
| except ValueError: | ||||||
| self._print_error(f"Invalid source: {source_str}. Use: apt, pip, npm") | ||||||
| return 1 | ||||||
| else: | ||||||
| # Default to apt | ||||||
| source = PackageSource.APT | ||||||
|
|
||||||
| success, message = pin_mgr.add_pin( | ||||||
| package=package, | ||||||
| version=version, | ||||||
| reason=reason, | ||||||
| pin_type=pin_type, | ||||||
| source=source, | ||||||
| sync_apt=sync_apt, | ||||||
| ) | ||||||
|
|
||||||
| if success: | ||||||
| cx_print(f"✓ {message}", "success") | ||||||
| return 0 | ||||||
| else: | ||||||
| self._print_error(message) | ||||||
| return 1 | ||||||
|
|
||||||
| def _pin_remove(self, pin_mgr: PinManager, args: argparse.Namespace) -> int: | ||||||
| """Remove a package pin""" | ||||||
| package = args.package | ||||||
| sync_apt = getattr(args, "sync_apt", False) | ||||||
|
|
||||||
| success, message = pin_mgr.remove_pin(package, sync_apt=sync_apt) | ||||||
|
|
||||||
| if success: | ||||||
| cx_print(f"✓ {message}", "success") | ||||||
| return 0 | ||||||
| else: | ||||||
| self._print_error(message) | ||||||
| return 1 | ||||||
|
|
||||||
| def _pin_list(self, pin_mgr: PinManager, args: argparse.Namespace) -> int: | ||||||
| """List all pinned packages""" | ||||||
| source_str = getattr(args, "source", None) | ||||||
| source = PackageSource(source_str) if source_str else None | ||||||
|
|
||||||
| pins = pin_mgr.list_pins(source=source) | ||||||
|
|
||||||
| if not pins: | ||||||
| cx_print("No packages are pinned", "info") | ||||||
| return 0 | ||||||
|
|
||||||
| cx_header("Pinned Packages") | ||||||
| console.print() | ||||||
|
|
||||||
| from rich.table import Table | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The import |
||||||
|
|
||||||
| table = Table(show_header=True, header_style="bold cyan", box=None) | ||||||
| table.add_column("Package", style="green") | ||||||
| table.add_column("Version", style="cyan") | ||||||
| table.add_column("Type", style="yellow") | ||||||
| table.add_column("Source", style="blue") | ||||||
| table.add_column("Pinned", style="dim") | ||||||
|
|
||||||
| for pin in pins: | ||||||
| age_days = pin.get_age_days() | ||||||
| age_str = ( | ||||||
| f"{age_days} day{'s' if age_days != 1 else ''} ago" if age_days > 0 else "today" | ||||||
| ) | ||||||
| table.add_row( | ||||||
| pin.package, | ||||||
| pin.format_version_display(), | ||||||
| pin.pin_type.value, | ||||||
| pin.source.value, | ||||||
| age_str, | ||||||
| ) | ||||||
|
|
||||||
| console.print(table) | ||||||
| console.print() | ||||||
| console.print(f"[dim]Total: {len(pins)} pinned package(s)[/dim]") | ||||||
| return 0 | ||||||
|
|
||||||
| def _pin_export(self, pin_mgr: PinManager, args: argparse.Namespace) -> int: | ||||||
| """Export pins to a file""" | ||||||
| output_file = getattr(args, "output", None) or "pins.json" | ||||||
|
|
||||||
| success, message = pin_mgr.export_pins(output_file) | ||||||
|
|
||||||
| if success: | ||||||
| cx_print(f"✓ {message}", "success") | ||||||
| return 0 | ||||||
| else: | ||||||
| self._print_error(message) | ||||||
| return 1 | ||||||
|
|
||||||
| def _pin_import(self, pin_mgr: PinManager, args: argparse.Namespace) -> int: | ||||||
| """Import pins from a file""" | ||||||
| input_file = args.file | ||||||
| merge = not getattr(args, "replace", False) | ||||||
|
|
||||||
| success, message, imported = pin_mgr.import_pins(input_file, merge=merge) | ||||||
|
|
||||||
| if success: | ||||||
| cx_print(f"✓ {message}", "success") | ||||||
| if imported: | ||||||
| console.print(f"[dim]Imported packages: {', '.join(imported[:10])}[/dim]") | ||||||
| if len(imported) > 10: | ||||||
| console.print(f"[dim]... and {len(imported) - 10} more[/dim]") | ||||||
| return 0 | ||||||
| else: | ||||||
| self._print_error(message) | ||||||
| return 1 | ||||||
|
|
||||||
| def _pin_clear(self, pin_mgr: PinManager, args: argparse.Namespace) -> int: | ||||||
| """Clear all pins""" | ||||||
| force = getattr(args, "force", False) | ||||||
|
|
||||||
| if not force: | ||||||
| confirm = input("⚠️ Clear ALL pins? (y/N): ") | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For consistency with the rest of the CLI, please use
Suggested change
|
||||||
| if confirm.lower() not in ("y", "yes"): | ||||||
| cx_print("Operation cancelled", "info") | ||||||
| return 0 | ||||||
|
|
||||||
| success, message = pin_mgr.clear_all_pins() | ||||||
|
|
||||||
| if success: | ||||||
| cx_print(f"✓ {message}", "success") | ||||||
| return 0 | ||||||
| else: | ||||||
| self._print_error(message) | ||||||
| return 1 | ||||||
|
|
||||||
| # -------------------------- | ||||||
|
|
||||||
|
|
||||||
|
|
@@ -2867,6 +3125,7 @@ def show_rich_help(): | |||||
| table.add_row("docker permissions", "Fix Docker bind-mount permissions") | ||||||
| table.add_row("sandbox <cmd>", "Test packages in Docker sandbox") | ||||||
| table.add_row("update", "Check for and install updates") | ||||||
| table.add_row("pin", "Manage package version pins") | ||||||
|
|
||||||
| console.print(table) | ||||||
| console.print() | ||||||
|
|
@@ -3026,6 +3285,12 @@ def main(): | |||||
| action="store_true", | ||||||
| help="Enable parallel execution for multi-step installs", | ||||||
| ) | ||||||
| install_parser.add_argument( | ||||||
| "--force", | ||||||
| "-f", | ||||||
| action="store_true", | ||||||
| help="Force installation even if package is pinned", | ||||||
| ) | ||||||
|
|
||||||
| # Remove command - uninstall with impact analysis | ||||||
| remove_parser = subparsers.add_parser( | ||||||
|
|
@@ -3422,6 +3687,53 @@ def main(): | |||||
| ) | ||||||
| # -------------------------- | ||||||
|
|
||||||
| # --- Pin Management Commands --- | ||||||
| pin_parser = subparsers.add_parser("pin", help="Manage package version pins") | ||||||
| pin_subs = pin_parser.add_subparsers(dest="pin_action", help="Pin actions") | ||||||
|
|
||||||
| # pin add <package@version> [--type TYPE] [--source SOURCE] [--reason REASON] [--sync-apt] | ||||||
| pin_add_parser = pin_subs.add_parser("add", help="Pin a package version") | ||||||
| pin_add_parser.add_argument("package_spec", help="Package and version (e.g., postgresql@14.10)") | ||||||
| pin_add_parser.add_argument( | ||||||
| "--type", | ||||||
| "-t", | ||||||
| choices=["exact", "minor", "major", "range"], | ||||||
| help="Pin type (auto-detected if not specified)", | ||||||
| ) | ||||||
| pin_add_parser.add_argument( | ||||||
| "--source", "-s", choices=["apt", "pip", "npm"], help="Package source (default: apt)" | ||||||
| ) | ||||||
| pin_add_parser.add_argument("--reason", "-r", help="Reason for pinning") | ||||||
| pin_add_parser.add_argument("--sync-apt", action="store_true", help="Also run apt-mark hold") | ||||||
|
|
||||||
| # pin remove <package> [--sync-apt] | ||||||
| pin_remove_parser = pin_subs.add_parser("remove", help="Remove a package pin") | ||||||
| pin_remove_parser.add_argument("package", help="Package name") | ||||||
| pin_remove_parser.add_argument( | ||||||
| "--sync-apt", action="store_true", help="Also run apt-mark unhold" | ||||||
| ) | ||||||
|
|
||||||
| # pin list [--source SOURCE] | ||||||
| pin_list_parser = pin_subs.add_parser("list", help="List all pinned packages") | ||||||
| pin_list_parser.add_argument( | ||||||
| "--source", "-s", choices=["apt", "pip", "npm"], help="Filter by source" | ||||||
| ) | ||||||
|
|
||||||
| # pin export [--output FILE] | ||||||
| pin_export_parser = pin_subs.add_parser("export", help="Export pins to file") | ||||||
| pin_export_parser.add_argument("--output", "-o", help="Output file (default: pins.json)") | ||||||
|
|
||||||
| # pin import <file> [--replace] | ||||||
| pin_import_parser = pin_subs.add_parser("import", help="Import pins from file") | ||||||
| pin_import_parser.add_argument("file", help="Input file") | ||||||
| pin_import_parser.add_argument( | ||||||
| "--replace", action="store_true", help="Replace all pins instead of merging" | ||||||
| ) | ||||||
|
|
||||||
| # pin clear [--force] | ||||||
| pin_clear_parser = pin_subs.add_parser("clear", help="Clear all pins") | ||||||
| pin_clear_parser.add_argument("--force", "-f", action="store_true", help="Skip confirmation") | ||||||
|
|
||||||
| # License and upgrade commands | ||||||
| subparsers.add_parser("upgrade", help="Upgrade to Cortex Pro") | ||||||
| subparsers.add_parser("license", help="Show license status") | ||||||
|
|
@@ -3596,6 +3908,7 @@ def main(): | |||||
| execute=args.execute, | ||||||
| dry_run=args.dry_run, | ||||||
| parallel=args.parallel, | ||||||
| force=getattr(args, "force", False), | ||||||
| ) | ||||||
| elif args.command == "remove": | ||||||
| # Handle --execute flag to override default dry-run | ||||||
|
|
@@ -3604,6 +3917,8 @@ def main(): | |||||
| return cli.remove(args) | ||||||
| elif args.command == "import": | ||||||
| return cli.import_deps(args) | ||||||
| elif args.command == "pin": | ||||||
| return cli.pin(args) | ||||||
| elif args.command == "history": | ||||||
| return cli.history(limit=args.limit, status=args.status, show_id=args.show_id) | ||||||
| elif args.command == "rollback": | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The auto-detection logic for the pin type is incomplete. It correctly identifies
rangeandminorpins, but it will incorrectly classify a major version pin (e.g.,postgresql@14) asexact. This can lead to unexpected behavior for users.The
pin_manager.pyfile contains a more robust detection logic in its private_detect_pin_typemethod. To fix this and avoid code duplication, I recommend making that logic available as a public utility function inpin_manager.pyand calling it here.For example, you could create a standalone function
detect_pin_type_from_versioninpin_manager.pyand use it here to ensure pin types are detected consistently and correctly.