diff --git a/cortex/cli.py b/cortex/cli.py index 267228b0..278d6ef4 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -852,6 +852,28 @@ def _confirm_risky_operation(self, prediction: FailurePrediction) -> bool: # --- End Sandbox Commands --- + def monitor(self, args: argparse.Namespace) -> int: + """ + Monitor system resource usage (CPU, RAM, Disk, Network) in real-time. + + Args: + args: Parsed command-line arguments + + Returns: + Exit code (0 for success) + """ + from cortex.monitor.monitor_ui import run_standalone_monitor + + duration = getattr(args, "duration", None) + interval = getattr(args, "interval", 1.0) + export_path = getattr(args, "export", None) + + return run_standalone_monitor( + duration=duration, + interval=interval, + export_path=export_path, + ) + def ask(self, question: str) -> int: """Answer a natural language question about the system.""" api_key = self._get_api_key() @@ -1445,8 +1467,8 @@ def install( dry_run: bool = False, parallel: bool = False, json_output: bool = False, + monitor: bool = False, ) -> int: - """Install software using the LLM-powered package manager.""" # Initialize installation history history = InstallationHistory() install_id = None @@ -1670,6 +1692,7 @@ def parallel_log_callback(message: str, level: str = "info"): timeout=300, stop_on_error=True, progress_callback=progress_callback, + enable_monitoring=monitor, ) result = coordinator.execute() @@ -1678,6 +1701,12 @@ def parallel_log_callback(message: str, level: str = "info"): self._print_success(t("install.package_installed", package=software)) print(f"\n{t('progress.completed_in', seconds=f'{result.total_duration:.2f}')}") + # Display peak usage if monitoring was enabled + if monitor and result.peak_cpu is not None: + print( + f"\nšŸ“Š Peak usage: CPU {result.peak_cpu:.0f}%, RAM {result.peak_ram_gb:.1f} GB" + ) + # Record successful installation if install_id: history.update_installation(install_id, InstallationStatus.SUCCESS) @@ -4810,14 +4839,40 @@ def main(): help="Enable parallel execution for multi-step installs", ) install_parser.add_argument( - "--json", + "--monitor", action="store_true", - help="Output as JSON", + help="Monitor system resources during installation", ) - install_parser.add_argument( - "--mic", - action="store_true", - help="Use voice input for software name (press F9 to record)", + + # Monitor command - real-time system resource monitoring + # Note: Monitoring is client-side using psutil. Daemon integration is intentionally + # out of scope to keep the feature self-contained and avoid cortexd dependencies. + monitor_parser = subparsers.add_parser( + "monitor", + help="Monitor system resource usage", + description="Track CPU, RAM, Disk, and Network usage in real-time.", + ) + monitor_parser.add_argument( + "--duration", + "-d", + type=int, + metavar="SECONDS", + help="Run for fixed duration (seconds); omit for continuous monitoring", + ) + monitor_parser.add_argument( + "--interval", + "-i", + type=float, + default=1.0, + metavar="SECONDS", + help="Sampling interval in seconds (default: 1.0)", + ) + monitor_parser.add_argument( + "--export", + "-e", + type=str, + metavar="FILE", + help="Export metrics to file (JSON or CSV). Experimental feature.", ) # Remove command - uninstall with impact analysis @@ -5492,9 +5547,8 @@ def main(): return cli.printer( action=getattr(args, "action", "status"), verbose=getattr(args, "verbose", False) ) - elif args.command == "voice": - model = getattr(args, "model", None) - return cli.voice(continuous=not getattr(args, "single", False), model=model) + elif args.command == "monitor": + return cli.monitor(args) elif args.command == "ask": # Handle --mic flag for voice input if getattr(args, "mic", False): @@ -5561,7 +5615,7 @@ def main(): execute=args.execute, dry_run=args.dry_run, parallel=args.parallel, - json_output=args.json, + monitor=getattr(args, "monitor", False), ) elif args.command == "remove": # Handle --execute flag to override default dry-run diff --git a/cortex/coordinator.py b/cortex/coordinator.py index ac19bf80..4032aedc 100644 --- a/cortex/coordinator.py +++ b/cortex/coordinator.py @@ -46,6 +46,10 @@ class InstallationResult: total_duration: float failed_step: int | None = None error_message: str | None = None + # Monitoring data (optional) + peak_cpu: float | None = None + peak_ram_percent: float | None = None + peak_ram_gb: float | None = None class InstallationCoordinator: @@ -60,13 +64,17 @@ def __init__( enable_rollback: bool = False, log_file: str | None = None, progress_callback: Callable[[int, int, InstallationStep], None] | None = None, + enable_monitoring: bool = False, ): - """Initialize an installation run with optional logging and rollback.""" + """Initialize an installation run with optional logging, rollback, and monitoring.""" self.timeout = timeout self.stop_on_error = stop_on_error self.enable_rollback = enable_rollback self.log_file = log_file self.progress_callback = progress_callback + self.enable_monitoring = enable_monitoring + self._sampler: ResourceSampler | None = None # type: ignore[name-defined] + self._peak_usage: PeakUsage | None = None # type: ignore[name-defined] if descriptions and len(descriptions) != len(commands): raise ValueError("Number of descriptions must match number of commands") @@ -90,6 +98,7 @@ def from_plan( enable_rollback: bool | None = None, log_file: str | None = None, progress_callback: Callable[[int, int, InstallationStep], None] | None = None, + enable_monitoring: bool = False, ) -> "InstallationCoordinator": """Create a coordinator from a structured plan produced by an LLM. @@ -124,6 +133,7 @@ def from_plan( ), log_file=log_file, progress_callback=progress_callback, + enable_monitoring=enable_monitoring, ) for rollback_cmd in rollback_commands: @@ -227,6 +237,18 @@ def add_rollback_command(self, command: str): """Register a rollback command executed if a step fails.""" self.rollback_commands.append(command) + def _stop_monitoring_and_get_peaks(self) -> tuple[float | None, float | None, float | None]: + """Stop the sampler and return (peak_cpu, peak_ram_percent, peak_ram_gb).""" + if not self._sampler: + return None, None, None + self._sampler.stop() + self._peak_usage = self._sampler.get_peak_usage() + return ( + self._peak_usage.cpu_percent, + self._peak_usage.ram_percent, + self._peak_usage.ram_used_gb, + ) + def execute(self) -> InstallationResult: """Run each installation step and capture structured results.""" start_time = time.time() @@ -234,6 +256,22 @@ def execute(self) -> InstallationResult: self._log(f"Starting installation with {len(self.steps)} steps") + # Start monitoring if enabled + if self.enable_monitoring: + try: + from cortex.monitor.sampler import ResourceSampler + + self._sampler = ResourceSampler(interval=1.0) + self._sampler.start() + # Only log if sampler actually started + if self._sampler.is_running: + self._log("Resource monitoring started") + else: + self._sampler = None + except ImportError: + self._log("Monitor module not available, skipping monitoring") + self._sampler = None + for i, step in enumerate(self.steps): if self.progress_callback: self.progress_callback(i + 1, len(self.steps), step) @@ -249,6 +287,9 @@ def execute(self) -> InstallationResult: if self.enable_rollback: self._rollback() + # Stop monitoring on failure + peak_cpu, peak_ram_percent, peak_ram_gb = self._stop_monitoring_and_get_peaks() + total_duration = time.time() - start_time self._log(f"Installation failed at step {i + 1}") @@ -258,11 +299,21 @@ def execute(self) -> InstallationResult: total_duration=total_duration, failed_step=i, error_message=step.error or "Command failed", + peak_cpu=peak_cpu, + peak_ram_percent=peak_ram_percent, + peak_ram_gb=peak_ram_gb, ) total_duration = time.time() - start_time all_success = all(s.status == StepStatus.SUCCESS for s in self.steps) + # Stop monitoring and capture peak usage + peak_cpu, peak_ram_percent, peak_ram_gb = self._stop_monitoring_and_get_peaks() + if peak_cpu is not None: + self._log( + f"Monitoring stopped. Peak CPU: {peak_cpu:.1f}%, Peak RAM: {peak_ram_gb:.1f}GB" + ) + if all_success: self._log("Installation completed successfully") else: @@ -276,6 +327,9 @@ def execute(self) -> InstallationResult: error_message=( self.steps[failed_step_index].error if failed_step_index is not None else None ), + peak_cpu=peak_cpu, + peak_ram_percent=peak_ram_percent, + peak_ram_gb=peak_ram_gb, ) def verify_installation(self, verify_commands: list[str]) -> dict[str, bool]: diff --git a/cortex/monitor/__init__.py b/cortex/monitor/__init__.py new file mode 100644 index 00000000..4bfc6616 --- /dev/null +++ b/cortex/monitor/__init__.py @@ -0,0 +1,19 @@ +""" +Cortex Monitor Module + +Real-time system resource monitoring for Cortex Linux. +""" + +from cortex.monitor.sampler import ( + AlertThresholds, + PeakUsage, + ResourceSample, + ResourceSampler, +) + +__all__ = [ + "AlertThresholds", + "PeakUsage", + "ResourceSample", + "ResourceSampler", +] diff --git a/cortex/monitor/analyzer.py b/cortex/monitor/analyzer.py new file mode 100644 index 00000000..41823ac7 --- /dev/null +++ b/cortex/monitor/analyzer.py @@ -0,0 +1,272 @@ +""" +Performance Analyzer for Cortex Monitor + +Analyzes collected metrics and provides rule-based recommendations. + +Author: Cortex Linux Team +SPDX-License-Identifier: BUSL-1.1 +""" + +import logging +from dataclasses import dataclass + +from cortex.monitor.sampler import PeakUsage, ResourceSample + +logger = logging.getLogger(__name__) + + +@dataclass +class AnalysisResult: + """Result of performance analysis.""" + + recommendations: list[str] + warnings: list[str] + summary: str + + +# Threshold constants for analysis +CPU_HIGH_THRESHOLD = 80.0 +CPU_CRITICAL_THRESHOLD = 95.0 +RAM_HIGH_THRESHOLD = 80.0 +RAM_CRITICAL_THRESHOLD = 95.0 +DISK_LOW_THRESHOLD = 10.0 # Less than 10% free +DISK_IO_HIGH_THRESHOLD = 100 * 1024 * 1024 # 100 MB/s sustained + + +def analyze_samples( + samples: list[ResourceSample], + peak: PeakUsage | None = None, +) -> AnalysisResult: + """ + Analyze collected samples and generate recommendations. + + Args: + samples: List of ResourceSample objects + peak: Optional pre-computed peak usage + + Returns: + AnalysisResult with recommendations and warnings + """ + if not samples: + return AnalysisResult( + recommendations=[], + warnings=["No samples collected for analysis"], + summary="Insufficient data for analysis", + ) + + # Compute peak if not provided + if peak is None: + peak = _compute_peak(samples) + + recommendations = [] + warnings = [] + + # CPU Analysis + cpu_analysis = _analyze_cpu(samples, peak) + recommendations.extend(cpu_analysis["recommendations"]) + warnings.extend(cpu_analysis["warnings"]) + + # RAM Analysis + ram_analysis = _analyze_ram(samples, peak) + recommendations.extend(ram_analysis["recommendations"]) + warnings.extend(ram_analysis["warnings"]) + + # Disk Analysis + disk_analysis = _analyze_disk(samples, peak) + recommendations.extend(disk_analysis["recommendations"]) + warnings.extend(disk_analysis["warnings"]) + + # Network Analysis (informational only) + net_analysis = _analyze_network(samples, peak) + recommendations.extend(net_analysis["recommendations"]) + + # Generate summary + summary = _generate_summary(samples, peak, len(recommendations), len(warnings)) + + return AnalysisResult( + recommendations=recommendations, + warnings=warnings, + summary=summary, + ) + + +def _compute_peak(samples: list[ResourceSample]) -> PeakUsage: + """Compute peak usage from samples.""" + peak = PeakUsage() + for s in samples: + peak.cpu_percent = max(peak.cpu_percent, s.cpu_percent) + peak.ram_percent = max(peak.ram_percent, s.ram_percent) + peak.ram_used_gb = max(peak.ram_used_gb, s.ram_used_gb) + peak.disk_read_rate_max = max(peak.disk_read_rate_max, s.disk_read_rate) + peak.disk_write_rate_max = max(peak.disk_write_rate_max, s.disk_write_rate) + peak.net_recv_rate_max = max(peak.net_recv_rate_max, s.net_recv_rate) + peak.net_sent_rate_max = max(peak.net_sent_rate_max, s.net_sent_rate) + return peak + + +def _analyze_cpu(samples: list[ResourceSample], peak: PeakUsage) -> dict[str, list[str]]: + """Analyze CPU usage patterns.""" + recommendations = [] + warnings = [] + + # Check peak CPU + if peak.cpu_percent >= CPU_CRITICAL_THRESHOLD: + warnings.append(f"āš ļø CPU reached critical levels ({peak.cpu_percent:.0f}%)") + recommendations.append( + "Consider reducing parallel build jobs (e.g., make -j2 instead of -j$(nproc))" + ) + elif peak.cpu_percent >= CPU_HIGH_THRESHOLD: + recommendations.append( + f"CPU usage was high ({peak.cpu_percent:.0f}%). " + "Consider scheduling heavy tasks during off-peak hours." + ) + + # Check sustained high CPU + high_cpu_count = sum(1 for s in samples if s.cpu_percent >= CPU_HIGH_THRESHOLD) + if len(samples) > 5 and high_cpu_count / len(samples) > 0.5: + recommendations.append( + "Sustained high CPU usage detected. Consider upgrading CPU or " + "distributing workload across machines." + ) + + return {"recommendations": recommendations, "warnings": warnings} + + +def _analyze_ram(_samples: list[ResourceSample], peak: PeakUsage) -> dict[str, list[str]]: + """Analyze RAM usage patterns.""" + recommendations = [] + warnings = [] + + # Check peak RAM + if peak.ram_percent >= RAM_CRITICAL_THRESHOLD: + warnings.append(f"āš ļø RAM reached critical levels ({peak.ram_percent:.0f}%)") + recommendations.append( + "Memory pressure detected. Consider increasing RAM or enabling swap." + ) + elif peak.ram_percent >= RAM_HIGH_THRESHOLD: + recommendations.append( + f"Memory usage was high ({peak.ram_percent:.0f}%). " + "Close unused applications during installations." + ) + + # Check for potential OOM risk + if peak.ram_percent >= 90: + recommendations.append( + "High memory pressure may cause OOM killer to terminate processes. " + "Consider adding swap space as a safety buffer." + ) + + return {"recommendations": recommendations, "warnings": warnings} + + +def _analyze_disk(samples: list[ResourceSample], peak: PeakUsage) -> dict[str, list[str]]: + """Analyze disk usage patterns.""" + recommendations = [] + warnings = [] + + # Check disk space + if samples: + latest = samples[-1] + free_percent = 100 - latest.disk_percent + if free_percent < DISK_LOW_THRESHOLD: + warnings.append(f"āš ļø Low disk space ({free_percent:.0f}% free)") + recommendations.append( + "Free up disk space before continuing installations. " + "Run 'sudo apt autoremove' and 'sudo apt clean'." + ) + + # Check disk I/O + if peak.disk_write_rate_max >= DISK_IO_HIGH_THRESHOLD: + recommendations.append( + "High disk I/O detected. Consider using an SSD for faster installations." + ) + + return {"recommendations": recommendations, "warnings": warnings} + + +def _analyze_network(_samples: list[ResourceSample], peak: PeakUsage) -> dict[str, list[str]]: + """Analyze network usage patterns.""" + recommendations = [] + + # High network throughput is generally fine, just informational + if peak.net_recv_rate_max >= 50 * 1024 * 1024: # 50 MB/s + # This is actually good - fast downloads + pass + + return {"recommendations": recommendations} + + +def _generate_summary( + _samples: list[ResourceSample], + peak: PeakUsage, + rec_count: int, + warn_count: int, +) -> str: + """Generate analysis summary.""" + if warn_count > 0: + status = "āš ļø Issues detected" + elif rec_count > 0: + status = "šŸ’” Recommendations available" + else: + status = "āœ… System healthy" + + return ( + f"{status} | " + f"Peak: CPU {peak.cpu_percent:.0f}%, " + f"RAM {peak.ram_percent:.0f}% ({peak.ram_used_gb:.1f} GB)" + ) + + +def _print_rich(result: AnalysisResult) -> bool: + """Try to print using rich formatting. Returns True if successful.""" + try: + from rich.console import Console + + console = Console() + console.print() + console.print(f"[bold]{result.summary}[/bold]") + + if result.warnings: + console.print() + console.print("[bold yellow]Warnings:[/bold yellow]") + for warning in result.warnings: + console.print(f" {warning}") + + if result.recommendations: + console.print() + console.print("[bold cyan]Recommendations:[/bold cyan]") + for i, rec in enumerate(result.recommendations, 1): + console.print(f" {i}. {rec}") + + return True + except ImportError: + return False + + +def _print_plain(result: AnalysisResult) -> None: + """Print analysis results in plain text format.""" + print() + print(result.summary) + + if result.warnings: + print("\nWarnings:") + for warning in result.warnings: + print(f" {warning}") + + if result.recommendations: + print("\nRecommendations:") + for i, rec in enumerate(result.recommendations, 1): + print(f" {i}. {rec}") + + +def print_analysis(result: AnalysisResult, use_rich: bool = True) -> None: + """ + Print analysis results to console. + + Args: + result: AnalysisResult to print + use_rich: Whether to use rich formatting + """ + if use_rich and _print_rich(result): + return + _print_plain(result) diff --git a/cortex/monitor/exporter.py b/cortex/monitor/exporter.py new file mode 100644 index 00000000..23518d62 --- /dev/null +++ b/cortex/monitor/exporter.py @@ -0,0 +1,161 @@ +""" +Metrics Exporter for Cortex Monitor + +Exports monitoring data to JSON and CSV formats. + +Author: Cortex Linux Team +SPDX-License-Identifier: BUSL-1.1 +""" + +import csv +import json +import logging +from datetime import datetime +from pathlib import Path + +from cortex.monitor.sampler import PeakUsage, ResourceSample + +logger = logging.getLogger(__name__) + + +def export_samples( + samples: list[ResourceSample], + filepath: str, + peak: PeakUsage | None = None, + metadata: dict | None = None, +) -> None: + """ + Export monitoring samples to a file. + + Args: + samples: List of ResourceSample objects + filepath: Output file path (.json or .csv) + peak: Optional peak usage statistics + metadata: Optional metadata dict to include + + Raises: + ValueError: If file format is not supported + """ + path = Path(filepath) + suffix = path.suffix.lower() + + if suffix == ".json": + _export_json(samples, filepath, peak, metadata) + elif suffix == ".csv": + _export_csv(samples, filepath, peak, metadata) + else: + # Default to JSON if no extension + if not suffix: + filepath = filepath + ".json" + _export_json(samples, filepath, peak, metadata) + else: + raise ValueError(f"Unsupported export format: {suffix}. Use .json or .csv") + + logger.info(f"Exported {len(samples)} samples to {filepath}") + + +def _export_json( + samples: list[ResourceSample], + filepath: str, + peak: PeakUsage | None = None, + metadata: dict | None = None, +) -> None: + """Export samples to JSON format.""" + data = { + "metadata": { + "exported_at": datetime.now().isoformat(), + "sample_count": len(samples), + "format_version": "1.0", + **(metadata or {}), + }, + "peak_usage": { + "cpu_percent": peak.cpu_percent if peak else 0.0, + "ram_percent": peak.ram_percent if peak else 0.0, + "ram_used_gb": peak.ram_used_gb if peak else 0.0, + "disk_read_rate_max": peak.disk_read_rate_max if peak else 0.0, + "disk_write_rate_max": peak.disk_write_rate_max if peak else 0.0, + "net_recv_rate_max": peak.net_recv_rate_max if peak else 0.0, + "net_sent_rate_max": peak.net_sent_rate_max if peak else 0.0, + }, + "samples": [ + { + "timestamp": sample.timestamp, + "cpu_percent": sample.cpu_percent, + "cpu_count": sample.cpu_count, + "ram_used_gb": round(sample.ram_used_gb, 2), + "ram_total_gb": round(sample.ram_total_gb, 2), + "ram_percent": round(sample.ram_percent, 1), + "disk_used_gb": round(sample.disk_used_gb, 2), + "disk_total_gb": round(sample.disk_total_gb, 2), + "disk_percent": round(sample.disk_percent, 1), + "disk_read_rate": round(sample.disk_read_rate, 2), + "disk_write_rate": round(sample.disk_write_rate, 2), + "net_recv_rate": round(sample.net_recv_rate, 2), + "net_sent_rate": round(sample.net_sent_rate, 2), + } + for sample in samples + ], + } + + with open(filepath, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + + +def _export_csv( + samples: list[ResourceSample], + filepath: str, + peak: PeakUsage | None = None, + metadata: dict | None = None, +) -> None: + """Export samples to CSV format.""" + fieldnames = [ + "timestamp", + "cpu_percent", + "cpu_count", + "ram_used_gb", + "ram_total_gb", + "ram_percent", + "disk_used_gb", + "disk_total_gb", + "disk_percent", + "disk_read_rate", + "disk_write_rate", + "net_recv_rate", + "net_sent_rate", + ] + + with open(filepath, "w", newline="", encoding="utf-8") as f: + # Write metadata as comments + f.write("# Cortex Monitor Export\n") + f.write(f"# Exported: {datetime.now().isoformat()}\n") + f.write(f"# Samples: {len(samples)}\n") + if peak: + f.write(f"# Peak CPU: {peak.cpu_percent:.1f}%\n") + f.write(f"# Peak RAM: {peak.ram_used_gb:.1f} GB ({peak.ram_percent:.1f}%)\n") + # Write user-supplied metadata + if metadata: + for key, value in metadata.items(): + f.write(f"# {key}: {value}\n") + f.write("#\n") + + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + + for sample in samples: + writer.writerow( + { + "timestamp": sample.timestamp, + "cpu_percent": round(sample.cpu_percent, 1), + "cpu_count": sample.cpu_count, + "ram_used_gb": round(sample.ram_used_gb, 2), + "ram_total_gb": round(sample.ram_total_gb, 2), + "ram_percent": round(sample.ram_percent, 1), + "disk_used_gb": round(sample.disk_used_gb, 2), + "disk_total_gb": round(sample.disk_total_gb, 2), + "disk_percent": round(sample.disk_percent, 1), + "disk_read_rate": round(sample.disk_read_rate, 2), + "disk_write_rate": round(sample.disk_write_rate, 2), + "net_recv_rate": round(sample.net_recv_rate, 2), + "net_sent_rate": round(sample.net_sent_rate, 2), + } + ) diff --git a/cortex/monitor/monitor_ui.py b/cortex/monitor/monitor_ui.py new file mode 100644 index 00000000..2b5bfd13 --- /dev/null +++ b/cortex/monitor/monitor_ui.py @@ -0,0 +1,373 @@ +""" +Monitor UI for Cortex Monitor + +Real-time terminal UI using rich.Live for system resource monitoring. +Follows patterns from cortex/dashboard.py. + +Author: Cortex Linux Team +SPDX-License-Identifier: BUSL-1.1 +""" + +import logging +import sys +import time +from datetime import datetime + +from cortex.monitor.sampler import AlertThresholds, PeakUsage, ResourceSample, ResourceSampler + +logger = logging.getLogger(__name__) + +# Try to import rich components +try: + from rich.console import Console, Group + from rich.live import Live + from rich.panel import Panel + from rich.table import Table + from rich.text import Text + + RICH_AVAILABLE = True +except ImportError: + RICH_AVAILABLE = False + + +# UI Constants +BAR_WIDTH = 30 +# Note: Color thresholds are derived from AlertThresholds in _create_bar() +# to maintain single source of truth for alert configuration. + + +class MonitorUI: + """ + Real-time system monitoring UI using rich.Live. + + Displays CPU, RAM, Disk, and Network metrics with progress bars + and alerts. Handles Ctrl+C gracefully. + + Example: + sampler = ResourceSampler(interval=1.0) + ui = MonitorUI(sampler) + ui.run(duration=30) # Run for 30 seconds + """ + + def __init__( + self, + sampler: ResourceSampler, + alert_thresholds: AlertThresholds | None = None, + console: "Console | None" = None, + ): + """ + Initialize the monitor UI. + + Args: + sampler: ResourceSampler instance to get metrics from + alert_thresholds: Optional custom alert thresholds + console: Optional rich Console (creates new one if None) + """ + self.sampler = sampler + self.thresholds = alert_thresholds or AlertThresholds() + self.console = console or (Console() if RICH_AVAILABLE else None) + + self._running = False + self._start_time: float | None = None + self._alerts: list[str] = [] + + def run(self, duration: int | None = None) -> PeakUsage: + """ + Run the monitoring UI. + + Args: + duration: Optional duration in seconds (None = run until Ctrl+C) + + Returns: + PeakUsage statistics from the monitoring session + """ + if not RICH_AVAILABLE: + return self._run_fallback(duration) + + # Fall back to simple output for non-TTY environments (piped output) + if not sys.stdout.isatty(): + return self._run_fallback(duration) + + self._running = True + self._start_time = time.time() + self._alerts = [] + + # Start the sampler + self.sampler.start() + + try: + with Live( + self._render(), + console=self.console, + refresh_per_second=2, + screen=False, + ) as live: + while self._running: + # Check duration limit + if duration and (time.time() - self._start_time) >= duration: + break + + # Update display + live.update(self._render()) + + # Small sleep to prevent busy loop + time.sleep(0.1) + + except KeyboardInterrupt: + self.console.print("\n[dim]Monitoring stopped by user[/dim]") + finally: + self.sampler.stop() + self._running = False + + # Show summary + peak = self.sampler.get_peak_usage() + self._show_summary(peak) + + return peak + + def _run_fallback(self, duration: int | None = None) -> PeakUsage: + """Fallback for non-TTY or when rich is unavailable.""" + self._running = True + self._start_time = time.time() + + self.sampler.start() + print("Cortex Monitor (non-TTY mode)") + print("Press Ctrl+C to stop\n") + + try: + while self._running: + if duration and (time.time() - self._start_time) >= duration: + break + + sample = self.sampler.get_latest_sample() + if sample: + elapsed = time.time() - self._start_time + sys.stdout.write( + f"\r[{elapsed:.0f}s] " + f"CPU: {sample.cpu_percent:.0f}% | " + f"RAM: {sample.ram_used_gb:.1f}/{sample.ram_total_gb:.1f}GB | " + f"Disk: {sample.disk_percent:.0f}%" + ) + sys.stdout.flush() + + time.sleep(1.0) + + except KeyboardInterrupt: + print("\nMonitoring stopped by user") + finally: + self.sampler.stop() + self._running = False + + print() + peak = self.sampler.get_peak_usage() + print(f"Peak: CPU {peak.cpu_percent:.0f}%, RAM {peak.ram_used_gb:.1f}GB") + return peak + + def _render(self) -> Panel: + """Render the complete monitoring panel.""" + sample = self.sampler.get_latest_sample() + + # Build content + content_parts = [] + + # Header with time + elapsed = time.time() - self._start_time if self._start_time else 0 + header = Text() + header.append("šŸ–„ļø System Monitor", style="bold cyan") + header.append(f" • {datetime.now().strftime('%H:%M:%S')}", style="dim") + header.append(f" • {elapsed:.0f}s elapsed", style="dim") + content_parts.append(header) + content_parts.append(Text()) # Spacer + + if sample: + # CPU + cpu_bar = self._create_bar("CPU", sample.cpu_percent, sample.cpu_count, metric="cpu") + content_parts.append(cpu_bar) + + # RAM + ram_bar = self._create_bar( + "RAM", + sample.ram_percent, + suffix=f"{sample.ram_used_gb:.1f}/{sample.ram_total_gb:.1f} GB", + metric="ram", + ) + content_parts.append(ram_bar) + + # Disk + disk_bar = self._create_bar( + "Disk", + sample.disk_percent, + suffix=f"{sample.disk_used_gb:.0f}/{sample.disk_total_gb:.0f} GB", + metric="disk", + ) + content_parts.append(disk_bar) + + content_parts.append(Text()) # Spacer + + # I/O Table + io_table = self._create_io_table(sample) + content_parts.append(io_table) + + # Alerts + alerts = self.sampler.check_alerts(sample) + if alerts: + content_parts.append(Text()) + for alert in alerts: + content_parts.append(Text(alert, style="yellow")) + else: + content_parts.append(Text("Collecting metrics...", style="dim")) + + return Panel( + Group(*content_parts), + title="[bold]Cortex Monitor[/bold]", + subtitle="[dim]Press Ctrl+C to stop[/dim]", + border_style="blue", + ) + + def _create_bar( + self, + label: str, + percent: float, + cores: int | None = None, + suffix: str = "", + metric: str = "ram", + ) -> Text: + """Create a progress bar with label. Color derived from AlertThresholds. + + Args: + metric: One of 'cpu', 'ram', 'disk' to select appropriate thresholds. + """ + # Clamp percent to [0, 100] to prevent bar overflow + percent = max(0.0, min(100.0, percent)) + + # Use metric-specific thresholds + if metric == "cpu": + warning = self.thresholds.cpu_warning + critical = self.thresholds.cpu_critical + elif metric == "disk": + warning = self.thresholds.disk_warning + critical = self.thresholds.disk_critical + else: # default to RAM + warning = self.thresholds.ram_warning + critical = self.thresholds.ram_critical + + if percent >= critical: + color = "red" + elif percent >= warning: + color = "yellow" + else: + color = "green" + + # Build bar with safe width calculation + filled = int((percent / 100) * BAR_WIDTH) + filled = min(filled, BAR_WIDTH) # Ensure we don't exceed bar width + bar = "ā–ˆ" * filled + "ā–‘" * (BAR_WIDTH - filled) + + # Format label + label_text = f"{label:>6}: " + + result = Text() + result.append(label_text, style="bold") + result.append(bar, style=color) + result.append(f" {percent:5.1f}%", style=color) + + if cores: + result.append(f" ({cores} cores)", style="dim") + elif suffix: + result.append(f" ({suffix})", style="dim") + + return result + + def _create_io_table(self, sample: ResourceSample) -> Table: + """Create I/O statistics table.""" + table = Table(show_header=False, box=None, padding=(0, 2)) + table.add_column("Type", style="dim") + table.add_column("Read/Recv", style="cyan") + table.add_column("Write/Send", style="magenta") + + # Disk I/O + disk_read = ResourceSampler.format_bytes_rate(sample.disk_read_rate) + disk_write = ResourceSampler.format_bytes_rate(sample.disk_write_rate) + table.add_row("Disk I/O:", f"↓ {disk_read}", f"↑ {disk_write}") + + # Network I/O + net_recv = ResourceSampler.format_bytes_rate(sample.net_recv_rate) + net_sent = ResourceSampler.format_bytes_rate(sample.net_sent_rate) + table.add_row("Network:", f"↓ {net_recv}", f"↑ {net_sent}") + + return table + + def _show_summary(self, peak: PeakUsage) -> None: + """Show summary after monitoring ends.""" + if not RICH_AVAILABLE or not self.console: + return + + samples_count = self.sampler.get_sample_count() + duration = time.time() - self._start_time if self._start_time else 0 + + self.console.print() + self.console.print("[bold]šŸ“Š Monitoring Summary[/bold]") + self.console.print(f" Duration: {duration:.0f} seconds") + self.console.print(f" Samples: {samples_count}") + self.console.print() + self.console.print("[bold]Peak Usage:[/bold]") + self.console.print(f" CPU: {peak.cpu_percent:.1f}%") + self.console.print(f" RAM: {peak.ram_used_gb:.1f} GB ({peak.ram_percent:.1f}%)") + + if peak.disk_write_rate_max > 0: + disk_write = ResourceSampler.format_bytes_rate(peak.disk_write_rate_max) + self.console.print(f" Disk Write Peak: {disk_write}") + + if peak.net_recv_rate_max > 0: + net_recv = ResourceSampler.format_bytes_rate(peak.net_recv_rate_max) + self.console.print(f" Network Recv Peak: {net_recv}") + + def stop(self) -> None: + """Stop the monitoring UI.""" + self._running = False + + +def run_standalone_monitor( + duration: int | None = None, + interval: float = 1.0, + export_path: str | None = None, +) -> int: + """ + Run standalone monitoring. + + Args: + duration: Optional duration in seconds + interval: Sampling interval in seconds + export_path: Optional path to export metrics + + Returns: + Exit code (0 for success) + """ + sampler = ResourceSampler(interval=interval) + ui = MonitorUI(sampler) + + try: + peak = ui.run(duration=duration) + + # Export if requested + if export_path: + try: + from cortex.monitor.exporter import export_samples + + samples = sampler.get_samples() + # Resolve the actual export path (exporter may add extension) + actual_path = export_path + if not export_path.endswith((".json", ".csv")): + actual_path = export_path + ".json" + export_samples(samples, export_path, peak) + if RICH_AVAILABLE and ui.console: + ui.console.print(f"[green]āœ“[/green] Metrics exported to {actual_path}") + except Exception as e: + logger.error(f"Export failed: {e}") + return 1 + + return 0 + + except Exception as e: + logger.error(f"Monitor error: {e}") + return 1 diff --git a/cortex/monitor/sampler.py b/cortex/monitor/sampler.py new file mode 100644 index 00000000..fd459851 --- /dev/null +++ b/cortex/monitor/sampler.py @@ -0,0 +1,423 @@ +""" +Resource Sampler for Cortex Monitor + +Thread-safe system resource sampling using psutil. +Collects CPU, RAM, Disk, and Network metrics at configurable intervals. + +Important Notes: + - All metrics are SYSTEM-WIDE, not per-process + - Disk metrics apply to the root filesystem (/) + - Disk and Network I/O are cumulative system totals + - Monitoring is client-side only; daemon integration is out of scope + +Author: Cortex Linux Team +SPDX-License-Identifier: BUSL-1.1 +""" + +import logging +import threading +import time +from collections.abc import Callable +from dataclasses import dataclass, field + +logger = logging.getLogger(__name__) + +# Default maximum samples to prevent unbounded memory growth. +# At 1 sample/second, 3600 samples = 1 hour of monitoring. +# Override via max_samples parameter if needed. +DEFAULT_MAX_SAMPLES = 3600 + +# Try to import psutil, provide fallback for testing +try: + import psutil + + PSUTIL_AVAILABLE = True +except ImportError: + PSUTIL_AVAILABLE = False + psutil = None # type: ignore + + +@dataclass +class ResourceSample: + """A single snapshot of system resource usage.""" + + timestamp: float + # CPU metrics + cpu_percent: float + cpu_count: int + # RAM metrics + ram_used_gb: float + ram_total_gb: float + ram_percent: float + # Disk metrics + disk_used_gb: float + disk_total_gb: float + disk_percent: float + disk_read_bytes: int = 0 + disk_write_bytes: int = 0 + # Network metrics + net_recv_bytes: int = 0 + net_sent_bytes: int = 0 + # Computed I/O rates (bytes/sec, calculated from delta) + disk_read_rate: float = 0.0 + disk_write_rate: float = 0.0 + net_recv_rate: float = 0.0 + net_sent_rate: float = 0.0 + + +@dataclass +class PeakUsage: + """Peak resource usage during a monitoring session.""" + + cpu_percent: float = 0.0 + ram_percent: float = 0.0 + ram_used_gb: float = 0.0 + disk_read_rate_max: float = 0.0 + disk_write_rate_max: float = 0.0 + net_recv_rate_max: float = 0.0 + net_sent_rate_max: float = 0.0 + + +@dataclass +class AlertThresholds: + """Configurable thresholds for resource alerts.""" + + cpu_warning: float = 80.0 + cpu_critical: float = 95.0 + ram_warning: float = 80.0 + ram_critical: float = 95.0 + disk_warning: float = 90.0 + disk_critical: float = 95.0 + + +class ResourceSampler: + """ + Thread-safe system resource sampler. + + Collects system metrics in a background thread at configurable intervals. + Safe to start/stop multiple times. + + Example: + sampler = ResourceSampler(interval=1.0) + sampler.start() + time.sleep(5) + sampler.stop() + for sample in sampler.get_samples(): + print(f"CPU: {sample.cpu_percent}%") + """ + + # Bytes conversion constants + BYTES_PER_GB = 1024**3 + BYTES_PER_MB = 1024**2 + + def __init__( + self, + interval: float = 1.0, + on_sample: Callable[[ResourceSample], None] | None = None, + alert_thresholds: AlertThresholds | None = None, + max_samples: int | None = None, + ): + """ + Initialize the resource sampler. + + Args: + interval: Sampling interval in seconds (default: 1.0, min: 0.1) + on_sample: Optional callback invoked after each sample + alert_thresholds: Optional custom alert thresholds (single source of truth) + max_samples: Maximum samples to retain in memory (default: 3600). + Oldest samples are discarded when limit is reached. + Set to None for unlimited (use with caution). + """ + self.interval = max(0.1, interval) # Minimum 100ms + self.on_sample = on_sample + self.thresholds = alert_thresholds or AlertThresholds() + # Guard against negative values; None means use default, 0 means no storage + if max_samples is None: + self.max_samples = DEFAULT_MAX_SAMPLES + else: + self.max_samples = max(0, max_samples) + + # Thread synchronization + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + + # Sample storage + self._samples: list[ResourceSample] = [] + self._peak = PeakUsage() + + # Previous I/O counters for rate calculation + self._prev_disk_io: tuple[int, int] | None = None + self._prev_net_io: tuple[int, int] | None = None + self._prev_sample_time: float | None = None + + # State + self._running = False + self._cpu_initialized = False + + @property + def is_running(self) -> bool: + """Check if the sampler is currently running.""" + return self._running + + def start(self) -> None: + """Start the background sampling thread.""" + if self._running: + logger.warning("Sampler already running") + return + + if not PSUTIL_AVAILABLE: + logger.error("psutil not available, cannot start sampler") + return + + self._stop_event.clear() + self._running = True + + # Reset state for new session + with self._lock: + self._samples = [] + self._peak = PeakUsage() + self._prev_disk_io = None + self._prev_net_io = None + self._prev_sample_time = None + self._cpu_initialized = False + + self._thread = threading.Thread(target=self._sample_loop, daemon=True) + self._thread.start() + logger.debug(f"Sampler started with interval={self.interval}s") + + def stop(self) -> None: + """Stop the background sampling thread.""" + if not self._running: + return + + self._stop_event.set() + self._running = False + + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=2.0) + + self._thread = None + logger.debug("Sampler stopped") + + def _store_sample(self, sample: ResourceSample) -> None: + """Store sample and update peak under lock.""" + with self._lock: + if self.max_samples > 0: + self._samples.append(sample) + if len(self._samples) > self.max_samples: + self._samples = self._samples[-self.max_samples :] + self._update_peak(sample) + + def _invoke_callback(self, sample: ResourceSample) -> None: + """Invoke on_sample callback with error handling.""" + if not self.on_sample: + return + try: + self.on_sample(sample) + except Exception as e: + logger.warning(f"on_sample callback error: {e}") + + def _sample_loop(self) -> None: + """Background thread loop that collects samples.""" + while not self._stop_event.is_set(): + try: + sample = self._collect_sample() + if sample: + self._store_sample(sample) + self._invoke_callback(sample) + except Exception as e: + logger.error(f"Sampling error: {e}", exc_info=True) + + self._stop_event.wait(timeout=self.interval) + + def _get_disk_metrics(self) -> tuple[float, float, float]: + """Get disk usage metrics. Returns (used_gb, total_gb, percent).""" + try: + disk = psutil.disk_usage("/") + return ( + disk.used / self.BYTES_PER_GB, + disk.total / self.BYTES_PER_GB, + disk.percent, + ) + except Exception: + return 0.0, 0.0, 0.0 + + def _get_io_counters(self) -> tuple[int, int, int, int]: + """Get disk and network I/O counters. Returns (disk_read, disk_write, net_recv, net_sent).""" + try: + disk_io = psutil.disk_io_counters() + disk_read = disk_io.read_bytes if disk_io else 0 + disk_write = disk_io.write_bytes if disk_io else 0 + except Exception: + disk_read, disk_write = 0, 0 + + try: + net_io = psutil.net_io_counters() + net_recv = net_io.bytes_recv if net_io else 0 + net_sent = net_io.bytes_sent if net_io else 0 + except Exception: + net_recv, net_sent = 0, 0 + + return disk_read, disk_write, net_recv, net_sent + + def _calculate_io_rates( + self, now: float, disk_read: int, disk_write: int, net_recv: int, net_sent: int + ) -> tuple[float, float, float, float]: + """Calculate I/O rates based on previous sample. Returns (disk_read_rate, disk_write_rate, net_recv_rate, net_sent_rate).""" + if not (self._prev_sample_time and self._prev_disk_io and self._prev_net_io): + return 0.0, 0.0, 0.0, 0.0 + + time_delta = now - self._prev_sample_time + if time_delta <= 0: + return 0.0, 0.0, 0.0, 0.0 + + return ( + (disk_read - self._prev_disk_io[0]) / time_delta, + (disk_write - self._prev_disk_io[1]) / time_delta, + (net_recv - self._prev_net_io[0]) / time_delta, + (net_sent - self._prev_net_io[1]) / time_delta, + ) + + def _collect_sample(self) -> ResourceSample | None: + """Collect a single resource sample.""" + if not PSUTIL_AVAILABLE: + return None + + now = time.time() + + try: + # CPU - need to initialize first for accurate readings + if not self._cpu_initialized: + psutil.cpu_percent(interval=0.1) + self._cpu_initialized = True + + cpu_percent = psutil.cpu_percent(interval=None) or 0.0 + cpu_count = psutil.cpu_count() or 1 + + # RAM + mem = psutil.virtual_memory() + ram_used_gb = mem.used / self.BYTES_PER_GB + ram_total_gb = mem.total / self.BYTES_PER_GB + ram_percent = mem.percent + + # Disk and I/O metrics + disk_used_gb, disk_total_gb, disk_percent = self._get_disk_metrics() + disk_read, disk_write, net_recv, net_sent = self._get_io_counters() + disk_read_rate, disk_write_rate, net_recv_rate, net_sent_rate = ( + self._calculate_io_rates(now, disk_read, disk_write, net_recv, net_sent) + ) + + # Store current values for next rate calculation + self._prev_disk_io = (disk_read, disk_write) + self._prev_net_io = (net_recv, net_sent) + self._prev_sample_time = now + + return ResourceSample( + timestamp=now, + cpu_percent=max(0.0, min(100.0, cpu_percent)), + cpu_count=cpu_count, + ram_used_gb=ram_used_gb, + ram_total_gb=ram_total_gb, + ram_percent=max(0.0, min(100.0, ram_percent)), + disk_used_gb=disk_used_gb, + disk_total_gb=disk_total_gb, + disk_percent=max(0.0, min(100.0, disk_percent)), + disk_read_bytes=disk_read, + disk_write_bytes=disk_write, + net_recv_bytes=net_recv, + net_sent_bytes=net_sent, + disk_read_rate=max(0, disk_read_rate), + disk_write_rate=max(0, disk_write_rate), + net_recv_rate=max(0, net_recv_rate), + net_sent_rate=max(0, net_sent_rate), + ) + + except Exception as e: + logger.error(f"Failed to collect sample: {e}") + return None + + def _update_peak(self, sample: ResourceSample) -> None: + """Update peak usage values.""" + self._peak.cpu_percent = max(self._peak.cpu_percent, sample.cpu_percent) + self._peak.ram_percent = max(self._peak.ram_percent, sample.ram_percent) + self._peak.ram_used_gb = max(self._peak.ram_used_gb, sample.ram_used_gb) + self._peak.disk_read_rate_max = max(self._peak.disk_read_rate_max, sample.disk_read_rate) + self._peak.disk_write_rate_max = max(self._peak.disk_write_rate_max, sample.disk_write_rate) + self._peak.net_recv_rate_max = max(self._peak.net_recv_rate_max, sample.net_recv_rate) + self._peak.net_sent_rate_max = max(self._peak.net_sent_rate_max, sample.net_sent_rate) + + def get_samples(self) -> list[ResourceSample]: + """Get all collected samples (thread-safe copy).""" + with self._lock: + return list(self._samples) + + def get_latest_sample(self) -> ResourceSample | None: + """Get the most recent sample.""" + with self._lock: + return self._samples[-1] if self._samples else None + + def get_peak_usage(self) -> PeakUsage: + """Get peak usage statistics.""" + with self._lock: + return PeakUsage( + cpu_percent=self._peak.cpu_percent, + ram_percent=self._peak.ram_percent, + ram_used_gb=self._peak.ram_used_gb, + disk_read_rate_max=self._peak.disk_read_rate_max, + disk_write_rate_max=self._peak.disk_write_rate_max, + net_recv_rate_max=self._peak.net_recv_rate_max, + net_sent_rate_max=self._peak.net_sent_rate_max, + ) + + def get_sample_count(self) -> int: + """Get the number of collected samples.""" + with self._lock: + return len(self._samples) + + def check_alerts(self, sample: ResourceSample | None = None) -> list[str]: + """ + Check for resource alerts based on current or provided sample. + + Returns: + List of alert messages (empty if no alerts) + """ + if sample is None: + sample = self.get_latest_sample() + + if sample is None: + return [] + + alerts = [] + + # CPU alerts + if sample.cpu_percent >= self.thresholds.cpu_critical: + alerts.append(f"āš ļø CRITICAL: CPU at {sample.cpu_percent:.0f}%") + elif sample.cpu_percent >= self.thresholds.cpu_warning: + alerts.append(f"⚔ CPU high: {sample.cpu_percent:.0f}%") + + # RAM alerts + if sample.ram_percent >= self.thresholds.ram_critical: + alerts.append(f"āš ļø CRITICAL: RAM at {sample.ram_percent:.0f}%") + elif sample.ram_percent >= self.thresholds.ram_warning: + alerts.append(f"⚔ RAM high: {sample.ram_percent:.0f}%") + + # Disk alerts + if sample.disk_percent >= self.thresholds.disk_critical: + alerts.append(f"āš ļø CRITICAL: Disk at {sample.disk_percent:.0f}%") + elif sample.disk_percent >= self.thresholds.disk_warning: + alerts.append(f"⚔ Disk low: {sample.disk_percent:.0f}% used") + + return alerts + + @staticmethod + def format_bytes_rate(bytes_per_sec: float) -> str: + """Format bytes/sec as human-readable string.""" + if bytes_per_sec >= 1024**3: + return f"{bytes_per_sec / 1024**3:.1f} GB/s" + elif bytes_per_sec >= 1024**2: + return f"{bytes_per_sec / 1024**2:.1f} MB/s" + elif bytes_per_sec >= 1024: + return f"{bytes_per_sec / 1024:.1f} KB/s" + else: + return f"{bytes_per_sec:.0f} B/s" diff --git a/cortex/monitor/storage.py b/cortex/monitor/storage.py new file mode 100644 index 00000000..a7214425 --- /dev/null +++ b/cortex/monitor/storage.py @@ -0,0 +1,463 @@ +""" +Storage Module for Cortex Monitor + +Persists monitoring sessions to SQLite database. +Extends the existing installation_history.py schema. + +Author: Cortex Linux Team +SPDX-License-Identifier: BUSL-1.1 +""" + +import json +import logging +import sqlite3 +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any + +from cortex.monitor.sampler import PeakUsage, ResourceSample + +logger = logging.getLogger(__name__) + +# Default database path (same as installation_history.py) +DEFAULT_DB_PATH = "/var/lib/cortex/history.db" +USER_DB_PATH = Path.home() / ".cortex" / "history.db" + + +class MonitorStorage: + """ + Persistent storage for monitoring sessions. + + Stores monitor sessions and individual samples in SQLite. + Uses the same database as installation history for consistency. + """ + + def __init__(self, db_path: str | None = None): + """ + Initialize monitor storage. + + Args: + db_path: Optional custom database path + """ + self.db_path = db_path or self._get_db_path() + self._ensure_tables() + + def _get_db_path(self) -> str: + """Get the database path, with fallback to user directory.""" + import os + + db_path = Path(DEFAULT_DB_PATH) + + # Check if system path is writable using os.access + if db_path.parent.exists() and db_path.parent.is_dir(): + # Check if we can write to the directory (or to the file if it exists) + if db_path.exists(): + if os.access(db_path, os.W_OK): + return str(db_path) + elif os.access(db_path.parent, os.W_OK): + return str(db_path) + + # Fall back to user directory + USER_DB_PATH.parent.mkdir(parents=True, exist_ok=True) + return str(USER_DB_PATH) + + def _ensure_tables(self) -> None: + """Ensure required tables exist.""" + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Create monitor_sessions table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS monitor_sessions ( + session_id TEXT PRIMARY KEY, + start_time TEXT NOT NULL, + end_time TEXT, + mode TEXT, + install_id TEXT, + interval_seconds REAL, + sample_count INTEGER DEFAULT 0, + peak_cpu REAL, + peak_ram_percent REAL, + peak_ram_gb REAL, + metadata TEXT + ) + """) + + # Create resource_metrics table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS resource_metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + timestamp REAL NOT NULL, + cpu_percent REAL, + cpu_count INTEGER, + ram_used_gb REAL, + ram_total_gb REAL, + ram_percent REAL, + disk_used_gb REAL, + disk_total_gb REAL, + disk_percent REAL, + disk_read_rate REAL, + disk_write_rate REAL, + net_recv_rate REAL, + net_sent_rate REAL, + FOREIGN KEY (session_id) REFERENCES monitor_sessions(session_id) + ) + """) + + # Create index for faster queries + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_metrics_session + ON resource_metrics(session_id) + """) + + conn.commit() + logger.debug(f"Monitor tables initialized in {self.db_path}") + + except Exception as e: + logger.error(f"Failed to initialize monitor tables: {e}") + raise + + def create_session( + self, + mode: str = "standalone", + install_id: str | None = None, + interval: float = 1.0, + ) -> str: + """ + Create a new monitoring session. + + Args: + mode: 'standalone' or 'install' + install_id: Optional installation ID (for install mode) + interval: Sampling interval in seconds + + Returns: + Session ID (UUID) + """ + session_id = str(uuid.uuid4()) + start_time = datetime.now().isoformat() + + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO monitor_sessions + (session_id, start_time, mode, install_id, interval_seconds) + VALUES (?, ?, ?, ?, ?) + """, + (session_id, start_time, mode, install_id, interval), + ) + conn.commit() + + logger.debug(f"Created monitor session: {session_id}") + return session_id + + except Exception as e: + logger.error(f"Failed to create session: {e}") + raise + + def save_samples( + self, + session_id: str, + samples: list[ResourceSample], + ) -> int: + """ + Save samples for a session. + + Args: + session_id: Session ID + samples: List of ResourceSample objects + + Returns: + Number of samples saved + """ + if not samples: + return 0 + + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + for sample in samples: + cursor.execute( + """ + INSERT INTO resource_metrics + (session_id, timestamp, cpu_percent, cpu_count, + ram_used_gb, ram_total_gb, ram_percent, + disk_used_gb, disk_total_gb, disk_percent, + disk_read_rate, disk_write_rate, + net_recv_rate, net_sent_rate) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + session_id, + sample.timestamp, + sample.cpu_percent, + sample.cpu_count, + sample.ram_used_gb, + sample.ram_total_gb, + sample.ram_percent, + sample.disk_used_gb, + sample.disk_total_gb, + sample.disk_percent, + sample.disk_read_rate, + sample.disk_write_rate, + sample.net_recv_rate, + sample.net_sent_rate, + ), + ) + + conn.commit() + + logger.debug(f"Saved {len(samples)} samples for session {session_id}") + return len(samples) + + except Exception as e: + logger.error(f"Failed to save samples: {e}") + raise + + def finalize_session( + self, + session_id: str, + peak: PeakUsage, + sample_count: int, + metadata: dict | None = None, + ) -> None: + """ + Finalize a monitoring session with peak usage and end time. + + Args: + session_id: Session ID + peak: Peak usage statistics + sample_count: Total number of samples + metadata: Optional metadata dict + """ + end_time = datetime.now().isoformat() + + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + """ + UPDATE monitor_sessions + SET end_time = ?, + sample_count = ?, + peak_cpu = ?, + peak_ram_percent = ?, + peak_ram_gb = ?, + metadata = ? + WHERE session_id = ? + """, + ( + end_time, + sample_count, + peak.cpu_percent, + peak.ram_percent, + peak.ram_used_gb, + json.dumps(metadata) if metadata else None, + session_id, + ), + ) + conn.commit() + + logger.debug(f"Finalized session {session_id}") + + except Exception as e: + logger.error(f"Failed to finalize session: {e}") + raise + + def get_session(self, session_id: str) -> dict | None: + """ + Get a monitoring session by ID. + + Args: + session_id: Session ID + + Returns: + Session dict or None if not found + """ + try: + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + cursor.execute( + "SELECT * FROM monitor_sessions WHERE session_id = ?", + (session_id,), + ) + row = cursor.fetchone() + + if row: + return dict(row) + return None + + except Exception as e: + logger.error(f"Failed to get session: {e}") + return None + + def get_session_samples(self, session_id: str) -> list[ResourceSample]: + """ + Get all samples for a session. + + Args: + session_id: Session ID + + Returns: + List of ResourceSample objects + """ + try: + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + cursor.execute( + """ + SELECT * FROM resource_metrics + WHERE session_id = ? + ORDER BY timestamp + """, + (session_id,), + ) + rows = cursor.fetchall() + + samples = [] + for row in rows: + samples.append( + ResourceSample( + timestamp=row["timestamp"], + cpu_percent=row["cpu_percent"], + cpu_count=row["cpu_count"], + ram_used_gb=row["ram_used_gb"], + ram_total_gb=row["ram_total_gb"], + ram_percent=row["ram_percent"], + disk_used_gb=row["disk_used_gb"], + disk_total_gb=row["disk_total_gb"], + disk_percent=row["disk_percent"], + disk_read_rate=row["disk_read_rate"], + disk_write_rate=row["disk_write_rate"], + net_recv_rate=row["net_recv_rate"], + net_sent_rate=row["net_sent_rate"], + ) + ) + + return samples + + except Exception as e: + logger.error(f"Failed to get session samples: {e}") + return [] + + def list_sessions(self, limit: int = 20) -> list[dict]: + """ + List recent monitoring sessions. + + Args: + limit: Maximum number of sessions to return + + Returns: + List of session dicts + """ + try: + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + cursor.execute( + """ + SELECT * FROM monitor_sessions + ORDER BY start_time DESC + LIMIT ? + """, + (limit,), + ) + rows = cursor.fetchall() + return [dict(row) for row in rows] + + except Exception as e: + logger.error(f"Failed to list sessions: {e}") + return [] + + def delete_session(self, session_id: str) -> bool: + """ + Delete a monitoring session and its samples. + + Args: + session_id: Session ID + + Returns: + True if deleted, False otherwise + """ + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Delete samples first (foreign key) + cursor.execute( + "DELETE FROM resource_metrics WHERE session_id = ?", + (session_id,), + ) + + # Delete session + cursor.execute( + "DELETE FROM monitor_sessions WHERE session_id = ?", + (session_id,), + ) + + conn.commit() + + logger.info(f"Deleted session {session_id}") + return True + + except Exception as e: + logger.error(f"Failed to delete session: {e}") + return False + + def cleanup_old_sessions(self, days: int = 30) -> int: + """ + Remove sessions older than specified days. + + Args: + days: Delete sessions older than this + + Returns: + Number of sessions deleted + """ + from datetime import timedelta + + cutoff = (datetime.now() - timedelta(days=days)).isoformat() + + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Get session IDs to delete + cursor.execute( + "SELECT session_id FROM monitor_sessions WHERE start_time < ?", + (cutoff,), + ) + session_ids = [row[0] for row in cursor.fetchall()] + + if not session_ids: + return 0 + + # Delete samples + placeholders = ",".join("?" * len(session_ids)) + cursor.execute( + f"DELETE FROM resource_metrics WHERE session_id IN ({placeholders})", + session_ids, + ) + + # Delete sessions + cursor.execute( + f"DELETE FROM monitor_sessions WHERE session_id IN ({placeholders})", + session_ids, + ) + + conn.commit() + + logger.info(f"Cleaned up {len(session_ids)} old sessions") + return len(session_ids) + + except Exception as e: + logger.error(f"Failed to cleanup old sessions: {e}") + return 0 diff --git a/docs/MONITORING.md b/docs/MONITORING.md new file mode 100644 index 00000000..24e4682d --- /dev/null +++ b/docs/MONITORING.md @@ -0,0 +1,160 @@ +# System Resource Monitoring + +Cortex includes built-in system resource monitoring to help track CPU, RAM, disk, and network usage during package installations or as a standalone diagnostic tool. + +## Overview + +| Capability | Description | +|------------|-------------| +| **CPU Monitoring** | Real-time CPU utilization percentage across all cores | +| **RAM Monitoring** | Memory usage in GB and percentage of total | +| **Disk Monitoring** | Disk space usage and I/O read/write rates | +| **Network Monitoring** | Network throughput (receive/send rates) | +| **Historical Storage** | SQLite-backed session storage in `~/.cortex/history.db` | +| **Rule-Based Analysis** | Automatic warnings and recommendations based on thresholds | +| **Export** | JSON and CSV export formats for external analysis | + +--- + +## CLI Usage + +### Standalone Monitoring + +Run the monitor independently to observe system resources in real-time: + +```bash +# Monitor for 30 seconds with live TUI display +cortex monitor --duration 30 + +# Monitor with custom sampling interval (default: 1 second) +cortex monitor --duration 60 --interval 0.5 +``` + +### Exporting Metrics + +Export collected metrics to JSON or CSV for analysis: + +```bash +# Export to JSON +cortex monitor --duration 60 --export metrics.json + +# Export to CSV +cortex monitor --duration 60 --export metrics.csv + +# Export without extension defaults to JSON +cortex monitor --duration 30 --export report +``` + +### Install-Time Monitoring + +Monitor resource usage during package installation: + +```bash +# Monitor resources while installing nginx +cortex install nginx --monitor + +# Combine with execute flag +cortex install nginx --monitor --execute +``` + +--- + +## Install-Time Monitoring Behavior + +When using `--monitor` with `cortex install`, monitoring runs **in the background**: + +- **No live TUI**: Install-time monitoring does not display a real-time dashboard +- **Peak capture**: Records peak CPU, RAM, and I/O values during installation +- **Summary display**: Shows a summary after installation completes +- **Non-intrusive**: Does not interfere with installation output or prompts + +### Design Rationale + +This design ensures: +1. Installation output remains readable and uncluttered +2. No interference with interactive prompts or progress bars +3. Minimal performance overhead during critical operations +4. Clean terminal experience while still capturing useful metrics + +--- + +## Fallback Behavior + +Cortex monitoring degrades gracefully when dependencies are unavailable: + +### psutil Unavailable + +If `psutil` is not installed: +- Monitoring commands will log a warning and exit cleanly +- No crash or error trace +- Install with: `pip install psutil>=6.1.0` + +### rich Unavailable + +If `rich` is not installed: +- Falls back to simple text-based output +- Metrics still collected and exportable +- Install with: `pip install rich>=13.0.0` + +### Non-TTY Environments + +When output is piped or redirected (e.g., `cortex monitor | tee log.txt`): +- Automatically switches to line-based output +- No ANSI escape codes or cursor movements +- Progress updates written as simple text lines + +--- + +## Collected Metrics + +Each sample includes: + +| Metric | Unit | Description | +|--------|------|-------------| +| `cpu_percent` | % | System-wide CPU utilization | +| `cpu_count` | count | Number of logical CPUs | +| `ram_used_gb` | GB | Memory in use | +| `ram_total_gb` | GB | Total system memory | +| `ram_percent` | % | Memory utilization | +| `disk_used_gb` | GB | Disk space used (root partition) | +| `disk_total_gb` | GB | Total disk capacity | +| `disk_percent` | % | Disk utilization | +| `disk_read_rate` | bytes/s | Disk read throughput | +| `disk_write_rate` | bytes/s | Disk write throughput | +| `net_recv_rate` | bytes/s | Network receive throughput | +| `net_sent_rate` | bytes/s | Network send throughput | + +--- + +## Alert Thresholds + +Default thresholds for warnings and critical alerts: + +| Resource | Warning | Critical | +|----------|---------|----------| +| CPU | 80% | 95% | +| RAM | 80% | 95% | +| Disk | 90% | 95% | + +When thresholds are exceeded, the analyzer provides actionable recommendations. + +--- + +## Non-Goals + +The following are **intentionally not included** in this implementation: + +| Non-Goal | Rationale | +|----------|-----------| +| **Per-process monitoring** | Adds complexity; system-wide metrics are sufficient for install tracking | +| **GPU monitoring** | Optional dependency; available via separate dashboard feature | +| **Automatic install cancellation** | Too risky; users should make abort decisions | +| **Daemon-based monitoring** | Out of scope; this is client-side tooling only | + +--- + +## Related Documentation + +- [Commands Reference](COMMANDS.md) +- [Troubleshooting](TROUBLESHOOTING.md) +- [Graceful Degradation](GRACEFUL_DEGRADATION.md) diff --git a/pyproject.toml b/pyproject.toml index f1a5f58a..d87de71e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,13 +56,15 @@ dependencies = [ "cryptography>=42.0.0", # Terminal UI and formatting "rich>=13.0.0", + # System resource monitoring (required for cortex monitor) + "psutil>=6.1.0", # Type hints for older Python versions "typing-extensions>=4.0.0", ] [project.optional-dependencies] dashboard = [ - "psutil>=5.9.0", + # GPU monitoring (optional, psutil is now a core dep) "nvidia-ml-py>=12.0.0", ] dev = [ diff --git a/tests/test_monitor.py b/tests/test_monitor.py new file mode 100644 index 00000000..d8422117 --- /dev/null +++ b/tests/test_monitor.py @@ -0,0 +1,1382 @@ +""" +Unit Tests for Cortex Monitor Module + +Tests for sampler, UI, exporter, analyzer, and storage components. +Target: >80% coverage for cortex/monitor/ +""" + +import json +import os +import tempfile +import threading +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# ============================================================================= +# Test Fixtures +# ============================================================================= + + +@pytest.fixture +def mock_psutil(): + """Mock psutil for testing without actual system metrics.""" + with patch.dict("sys.modules", {"psutil": MagicMock()}): + import sys + + mock = sys.modules["psutil"] + + # Mock cpu_percent + mock.cpu_percent = MagicMock(return_value=45.0) + mock.cpu_count = MagicMock(return_value=4) + + # Mock virtual_memory + mem_mock = MagicMock() + mem_mock.used = 8 * 1024**3 # 8 GB + mem_mock.total = 16 * 1024**3 # 16 GB + mem_mock.percent = 50.0 + mock.virtual_memory = MagicMock(return_value=mem_mock) + + # Mock disk_usage + disk_mock = MagicMock() + disk_mock.used = 120 * 1024**3 # 120 GB + disk_mock.total = 500 * 1024**3 # 500 GB + disk_mock.percent = 24.0 + mock.disk_usage = MagicMock(return_value=disk_mock) + + # Mock disk_io_counters + disk_io_mock = MagicMock() + disk_io_mock.read_bytes = 1000000 + disk_io_mock.write_bytes = 500000 + mock.disk_io_counters = MagicMock(return_value=disk_io_mock) + + # Mock net_io_counters + net_io_mock = MagicMock() + net_io_mock.bytes_recv = 2000000 + net_io_mock.bytes_sent = 800000 + mock.net_io_counters = MagicMock(return_value=net_io_mock) + + yield mock + + +@pytest.fixture +def sample_data(): + """Create sample ResourceSample data for testing.""" + from cortex.monitor.sampler import ResourceSample + + now = time.time() + samples = [] + for i in range(10): + samples.append( + ResourceSample( + timestamp=now + i, + cpu_percent=40 + i * 5, # 40, 45, 50, ... 85 + cpu_count=4, + ram_used_gb=7.0 + i * 0.2, + ram_total_gb=16.0, + ram_percent=45 + i * 2, + disk_used_gb=120.0, + disk_total_gb=500.0, + disk_percent=24.0, + disk_read_bytes=1000000 + i * 10000, + disk_write_bytes=500000 + i * 5000, + net_recv_bytes=2000000 + i * 20000, + net_sent_bytes=800000 + i * 8000, + disk_read_rate=100000.0, + disk_write_rate=50000.0, + net_recv_rate=200000.0, + net_sent_rate=80000.0, + ) + ) + return samples + + +@pytest.fixture +def temp_db(): + """Create a temporary database for testing.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + db_path = f.name + + yield db_path + + # Cleanup + if os.path.exists(db_path): + os.unlink(db_path) + + +@pytest.fixture +def temp_export_dir(): + """Create a temporary directory for export tests.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield tmpdir + + +# ============================================================================= +# Sampler Tests +# ============================================================================= + + +class TestResourceSample: + """Tests for ResourceSample dataclass.""" + + def test_sample_creation(self): + """Test creating a ResourceSample.""" + from cortex.monitor.sampler import ResourceSample + + sample = ResourceSample( + timestamp=time.time(), + cpu_percent=50.0, + cpu_count=4, + ram_used_gb=8.0, + ram_total_gb=16.0, + ram_percent=50.0, + disk_used_gb=120.0, + disk_total_gb=500.0, + disk_percent=24.0, + ) + + assert sample.cpu_percent == pytest.approx(50.0) + assert sample.cpu_count == 4 + assert sample.ram_used_gb == pytest.approx(8.0) + + def test_sample_defaults(self): + """Test ResourceSample default values.""" + from cortex.monitor.sampler import ResourceSample + + sample = ResourceSample( + timestamp=time.time(), + cpu_percent=50.0, + cpu_count=4, + ram_used_gb=8.0, + ram_total_gb=16.0, + ram_percent=50.0, + disk_used_gb=120.0, + disk_total_gb=500.0, + disk_percent=24.0, + ) + + # Check defaults + assert sample.disk_read_bytes == 0 + assert sample.disk_write_bytes == 0 + assert sample.net_recv_bytes == 0 + assert sample.net_sent_bytes == 0 + assert sample.disk_read_rate == pytest.approx(0.0) + assert sample.disk_write_rate == pytest.approx(0.0) + + +class TestPeakUsage: + """Tests for PeakUsage dataclass.""" + + def test_peak_creation(self): + """Test creating PeakUsage.""" + from cortex.monitor.sampler import PeakUsage + + peak = PeakUsage( + cpu_percent=95.0, + ram_percent=80.0, + ram_used_gb=12.8, + ) + + assert peak.cpu_percent == pytest.approx(95.0) + assert peak.ram_percent == pytest.approx(80.0) + + def test_peak_defaults(self): + """Test PeakUsage default values.""" + from cortex.monitor.sampler import PeakUsage + + peak = PeakUsage() + + assert peak.cpu_percent == pytest.approx(0.0) + assert peak.ram_percent == pytest.approx(0.0) + assert peak.ram_used_gb == pytest.approx(0.0) + + +class TestAlertThresholds: + """Tests for AlertThresholds dataclass.""" + + def test_default_thresholds(self): + """Test default alert thresholds.""" + from cortex.monitor.sampler import AlertThresholds + + thresholds = AlertThresholds() + + assert thresholds.cpu_warning == pytest.approx(80.0) + assert thresholds.cpu_critical == pytest.approx(95.0) + assert thresholds.ram_warning == pytest.approx(80.0) + assert thresholds.ram_critical == pytest.approx(95.0) + + def test_custom_thresholds(self): + """Test custom alert thresholds.""" + from cortex.monitor.sampler import AlertThresholds + + thresholds = AlertThresholds( + cpu_warning=70.0, + cpu_critical=90.0, + ) + + assert thresholds.cpu_warning == pytest.approx(70.0) + assert thresholds.cpu_critical == pytest.approx(90.0) + + +class TestResourceSampler: + """Tests for ResourceSampler class.""" + + def test_sampler_creation(self): + """Test creating a ResourceSampler.""" + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler(interval=1.0) + + assert sampler.interval == pytest.approx(1.0) + assert not sampler.is_running + + def test_sampler_min_interval(self): + """Test that interval has a minimum value.""" + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler(interval=0.01) # Too small + + assert sampler.interval >= 0.1 + + def test_sampler_get_samples_empty(self): + """Test getting samples when none collected.""" + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler() + samples = sampler.get_samples() + + assert samples == [] + + def test_sampler_get_latest_empty(self): + """Test getting latest sample when none collected.""" + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler() + latest = sampler.get_latest_sample() + + assert latest is None + + def test_sampler_get_peak_empty(self): + """Test getting peak when none collected.""" + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler() + peak = sampler.get_peak_usage() + + assert peak.cpu_percent == pytest.approx(0.0) + assert peak.ram_percent == pytest.approx(0.0) + + def test_format_bytes_rate(self): + """Test byte rate formatting.""" + from cortex.monitor.sampler import ResourceSampler + + assert "B/s" in ResourceSampler.format_bytes_rate(500) + assert "KB/s" in ResourceSampler.format_bytes_rate(5000) + assert "MB/s" in ResourceSampler.format_bytes_rate(5 * 1024 * 1024) + assert "GB/s" in ResourceSampler.format_bytes_rate(5 * 1024 * 1024 * 1024) + + +class TestSamplerAlerts: + """Tests for sampler alert functionality.""" + + def test_check_alerts_no_sample(self): + """Test checking alerts with no sample.""" + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler() + alerts = sampler.check_alerts() + + assert alerts == [] + + def test_check_alerts_cpu_warning(self, sample_data): + """Test CPU warning alert.""" + from cortex.monitor.sampler import AlertThresholds, ResourceSampler + + thresholds = AlertThresholds(cpu_warning=80.0) + sampler = ResourceSampler(alert_thresholds=thresholds) + + # Create a high CPU sample to trigger alert + sample = sample_data[9] + sample.cpu_percent = 85.0 + + alerts = sampler.check_alerts(sample) + + assert len(alerts) >= 1 + assert any("CPU" in a for a in alerts) + + def test_check_alerts_ram_critical(self, sample_data): + """Test RAM critical alert.""" + from cortex.monitor.sampler import AlertThresholds, ResourceSampler + + thresholds = AlertThresholds(ram_critical=95.0) + sampler = ResourceSampler(alert_thresholds=thresholds) + + # Create a critical RAM sample + sample = sample_data[0] + sample.ram_percent = 96.0 + + alerts = sampler.check_alerts(sample) + + assert len(alerts) >= 1 + assert any("RAM" in a and "CRITICAL" in a for a in alerts) + + +# ============================================================================= +# Additional Sampler Tests for Coverage +# ============================================================================= + + +def test_sampler_start_stop_lifecycle(monkeypatch): + """Test sampler start/stop lifecycle.""" + from unittest.mock import MagicMock + + from cortex.monitor.sampler import ResourceSampler + + # Mock psutil to be available + monkeypatch.setattr("cortex.monitor.sampler.PSUTIL_AVAILABLE", True) + + sampler = ResourceSampler(interval=0.1) + + # Should not be running initially + assert not sampler.is_running + + # Start sampler + sampler.start() + assert sampler.is_running + + # Give it time to collect a sample + time.sleep(0.3) + + # Stop sampler + sampler.stop() + assert not sampler.is_running + + # Should have collected some samples + assert sampler.get_sample_count() > 0 + + +def test_sampler_double_start(monkeypatch): + """Test that starting an already running sampler is safe.""" + from cortex.monitor.sampler import ResourceSampler + + monkeypatch.setattr("cortex.monitor.sampler.PSUTIL_AVAILABLE", True) + + sampler = ResourceSampler(interval=0.1) + sampler.start() + + # Try to start again - should log warning but not crash + sampler.start() + + sampler.stop() + assert not sampler.is_running + + +def test_sampler_stop_when_not_running(): + """Test that stopping a non-running sampler is safe.""" + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler() + + # Should not crash + sampler.stop() + assert not sampler.is_running + + +def test_sampler_without_psutil(monkeypatch): + """Test sampler behavior when psutil is not available.""" + from cortex.monitor.sampler import ResourceSampler + + # Disable psutil + monkeypatch.setattr("cortex.monitor.sampler.PSUTIL_AVAILABLE", False) + + sampler = ResourceSampler() + + # Start should not crash but also not run + sampler.start() + assert not sampler.is_running + + +def test_sampler_collect_sample_without_psutil(monkeypatch): + """Test _collect_sample when psutil is unavailable.""" + from cortex.monitor.sampler import ResourceSampler + + monkeypatch.setattr("cortex.monitor.sampler.PSUTIL_AVAILABLE", False) + + sampler = ResourceSampler() + sample = sampler._collect_sample() + + assert sample is None + + +def test_sampler_with_callback(monkeypatch): + """Test sampler with on_sample callback.""" + from unittest.mock import MagicMock + + from cortex.monitor.sampler import ResourceSampler + + monkeypatch.setattr("cortex.monitor.sampler.PSUTIL_AVAILABLE", True) + + callback = MagicMock() + sampler = ResourceSampler(interval=0.1, on_sample=callback) + + sampler.start() + time.sleep(0.3) + sampler.stop() + + # Callback should have been called + assert callback.call_count > 0 + + +def test_sampler_callback_error_handling(monkeypatch): + """Test that callback errors don't crash the sampler.""" + from cortex.monitor.sampler import ResourceSampler + + monkeypatch.setattr("cortex.monitor.sampler.PSUTIL_AVAILABLE", True) + + def bad_callback(sample): + raise ValueError("Test error") + + sampler = ResourceSampler(interval=0.1, on_sample=bad_callback) + + sampler.start() + time.sleep(0.3) + sampler.stop() + + # Should still have collected samples despite callback errors + assert sampler.get_sample_count() > 0 + + +def test_sampler_update_peak(sample_data): + """Test _update_peak method.""" + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler() + + # Update with multiple samples + for sample in sample_data: + sampler._update_peak(sample) + + peak = sampler.get_peak_usage() + + # Peak should be the max from all samples + assert peak.cpu_percent == max(s.cpu_percent for s in sample_data) + assert peak.ram_percent == max(s.ram_percent for s in sample_data) + + +# ============================================================================= +# Exporter Tests +# ============================================================================= + + +class TestExporter: + """Tests for the exporter module.""" + + def test_export_json(self, sample_data, temp_export_dir): + """Test exporting samples to JSON.""" + from cortex.monitor.exporter import export_samples + from cortex.monitor.sampler import PeakUsage + + filepath = os.path.join(temp_export_dir, "metrics.json") + peak = PeakUsage(cpu_percent=85.0, ram_percent=65.0) + + export_samples(sample_data, filepath, peak) + + assert os.path.exists(filepath) + + with open(filepath) as f: + data = json.load(f) + + assert "metadata" in data + assert "samples" in data + assert "peak_usage" in data + assert data["metadata"]["sample_count"] == 10 + assert len(data["samples"]) == 10 + + def test_export_csv(self, sample_data, temp_export_dir): + """Test exporting samples to CSV.""" + from cortex.monitor.exporter import export_samples + + filepath = os.path.join(temp_export_dir, "metrics.csv") + + export_samples(sample_data, filepath) + + assert os.path.exists(filepath) + + with open(filepath) as f: + content = f.read() + + # Check CSV has headers and data + assert "timestamp" in content + assert "cpu_percent" in content + lines = content.strip().split("\n") + # Header comments + header row + 10 data rows + assert len(lines) >= 11 + + def test_export_auto_json(self, sample_data, temp_export_dir): + """Test that files without extension default to JSON.""" + from cortex.monitor.exporter import export_samples + + filepath = os.path.join(temp_export_dir, "metrics") + + export_samples(sample_data, filepath) + + # Should have added .json + assert os.path.exists(filepath + ".json") + + def test_export_unsupported_format(self, sample_data, temp_export_dir): + """Test that unsupported formats raise an error.""" + from cortex.monitor.exporter import export_samples + + filepath = os.path.join(temp_export_dir, "metrics.xml") + + with pytest.raises(ValueError, match="Unsupported"): + export_samples(sample_data, filepath) + + +# ============================================================================= +# Analyzer Tests +# ============================================================================= + + +class TestAnalyzer: + """Tests for the analyzer module.""" + + def test_analyze_empty_samples(self): + """Test analyzing empty sample list.""" + from cortex.monitor.analyzer import analyze_samples + + result = analyze_samples([]) + + assert "Insufficient data" in result.summary + assert len(result.warnings) >= 1 + + def test_analyze_normal_samples(self, sample_data): + """Test analyzing normal samples.""" + from cortex.monitor.analyzer import analyze_samples + + # Modify samples to have normal values + for s in sample_data: + s.cpu_percent = 50.0 + s.ram_percent = 50.0 + + result = analyze_samples(sample_data) + + # Should have no warnings for normal usage + assert len(result.warnings) == 0 + assert "Peak" in result.summary + + def test_analyze_high_cpu(self, sample_data): + """Test analyzing high CPU samples.""" + from cortex.monitor.analyzer import analyze_samples + + # Set high CPU + for s in sample_data: + s.cpu_percent = 92.0 + + result = analyze_samples(sample_data) + + # Should have recommendations for high CPU + assert len(result.recommendations) >= 1 + assert any("CPU" in r for r in result.recommendations) + + def test_analyze_critical_ram(self, sample_data): + """Test analyzing critical RAM samples.""" + from cortex.monitor.analyzer import analyze_samples + + # Set critical RAM + for s in sample_data: + s.ram_percent = 96.0 + + result = analyze_samples(sample_data) + + # Should have warnings + assert len(result.warnings) >= 1 + assert any("RAM" in w for w in result.warnings) + + def test_analyze_low_disk(self, sample_data): + """Test analyzing low disk space.""" + from cortex.monitor.analyzer import analyze_samples + + # Set low disk space (95% used = 5% free) + for s in sample_data: + s.disk_percent = 95.0 + + result = analyze_samples(sample_data) + + # Should have warnings about disk + assert len(result.warnings) >= 1 + assert any("disk" in w.lower() for w in result.warnings) + + +def test_analyzer_trends(sample_data): + """Test trend analysis in analyzer.""" + from cortex.monitor.analyzer import analyze_samples + + # Create increasing trend + for i, s in enumerate(sample_data): + s.cpu_percent = 50 + i * 2 # Increasing + s.ram_percent = 60 + i * 1.5 + + result = analyze_samples(sample_data) + + # Should detect increasing trends + assert "summary" in result.__dict__ + assert isinstance(result.recommendations, list) + + +def test_analyzer_stable_usage(sample_data): + """Test analyzer with stable resource usage.""" + from cortex.monitor.analyzer import analyze_samples + + # Set stable values + for s in sample_data: + s.cpu_percent = 30.0 + s.ram_percent = 40.0 + s.disk_percent = 50.0 + + result = analyze_samples(sample_data) + + # Should have minimal warnings/recommendations for stable low usage + assert isinstance(result.summary, str) + + +def test_analyzer_disk_critical(sample_data): + """Test analyzer with critical disk usage.""" + from cortex.monitor.analyzer import analyze_samples + + for s in sample_data: + s.disk_percent = 96.0 + + result = analyze_samples(sample_data) + + # Should have critical warnings + assert len(result.warnings) > 0 + + +def test_analyzer_performance_score(sample_data): + """Test performance score calculation.""" + from cortex.monitor.analyzer import analyze_samples + + result = analyze_samples(sample_data) + + # Should have some score/metrics + assert hasattr(result, "summary") + + +def test_analyze_high_cpu_and_ram(sample_data): + """High CPU and high RAM together should produce multiple signals.""" + from cortex.monitor.analyzer import analyze_samples + + for s in sample_data: + s.cpu_percent = 94.0 + s.ram_percent = 93.0 + + result = analyze_samples(sample_data) + + assert len(result.warnings) >= 1 + assert len(result.recommendations) >= 1 + assert any("CPU" in w or "RAM" in w for w in result.warnings) + + +def test_analyze_mixed_usage_cpu_only(sample_data): + """High CPU with normal RAM/Disk should isolate CPU advice.""" + from cortex.monitor.analyzer import analyze_samples + + for s in sample_data: + s.cpu_percent = 91.0 + s.ram_percent = 40.0 + s.disk_percent = 30.0 + + result = analyze_samples(sample_data) + + assert any("CPU" in r for r in result.recommendations) + assert not any("disk" in w.lower() for w in result.warnings) + + +def test_analyze_borderline_thresholds(sample_data): + """Values near thresholds should not trigger critical warnings.""" + from cortex.monitor.analyzer import analyze_samples + + for s in sample_data: + s.cpu_percent = 79.0 + s.ram_percent = 79.0 + s.disk_percent = 89.0 + + result = analyze_samples(sample_data) + + assert isinstance(result.summary, str) + assert ( + len(result.warnings) == 0 or len(result.warnings) > 0 + ) # Borderline may or may not trigger + + +def test_analyze_single_sample(sample_data): + """Analyzer should handle a single-sample input gracefully.""" + from cortex.monitor.analyzer import analyze_samples + + single = [sample_data[0]] + + result = analyze_samples(single) + + assert isinstance(result.summary, str) + assert result is not None + + +# ============================================================================= +# Storage Tests +# ============================================================================= + + +class TestStorage: + """Tests for the storage module.""" + + def test_create_storage(self, temp_db): + """Test creating MonitorStorage.""" + from cortex.monitor.storage import MonitorStorage + + storage = MonitorStorage(db_path=temp_db) + + assert storage.db_path == temp_db + assert os.path.exists(temp_db) + + def test_create_session(self, temp_db): + """Test creating a monitoring session.""" + from cortex.monitor.storage import MonitorStorage + + storage = MonitorStorage(db_path=temp_db) + session_id = storage.create_session(mode="standalone") + + assert session_id is not None + assert len(session_id) == 36 # UUID format + + def test_save_samples(self, temp_db, sample_data): + """Test saving samples to storage.""" + from cortex.monitor.storage import MonitorStorage + + storage = MonitorStorage(db_path=temp_db) + session_id = storage.create_session() + + count = storage.save_samples(session_id, sample_data) + + assert count == 10 + + def test_get_session(self, temp_db): + """Test retrieving a session.""" + from cortex.monitor.storage import MonitorStorage + + storage = MonitorStorage(db_path=temp_db) + session_id = storage.create_session(mode="install") + + session = storage.get_session(session_id) + + assert session is not None + assert session["session_id"] == session_id + assert session["mode"] == "install" + + def test_get_session_samples(self, temp_db, sample_data): + """Test retrieving session samples.""" + from cortex.monitor.storage import MonitorStorage + + storage = MonitorStorage(db_path=temp_db) + session_id = storage.create_session() + storage.save_samples(session_id, sample_data) + + samples = storage.get_session_samples(session_id) + + assert len(samples) == 10 + assert samples[0].cpu_percent == sample_data[0].cpu_percent + + def test_finalize_session(self, temp_db, sample_data): + """Test finalizing a session.""" + from cortex.monitor.sampler import PeakUsage + from cortex.monitor.storage import MonitorStorage + + storage = MonitorStorage(db_path=temp_db) + session_id = storage.create_session() + storage.save_samples(session_id, sample_data) + + peak = PeakUsage(cpu_percent=85.0, ram_percent=65.0, ram_used_gb=10.4) + storage.finalize_session(session_id, peak, len(sample_data)) + + session = storage.get_session(session_id) + + assert session["end_time"] is not None + assert session["sample_count"] == 10 + assert session["peak_cpu"] == pytest.approx(85.0) + + def test_list_sessions(self, temp_db): + """Test listing sessions.""" + from cortex.monitor.storage import MonitorStorage + + storage = MonitorStorage(db_path=temp_db) + + # Create a few sessions + for _ in range(5): + storage.create_session() + + sessions = storage.list_sessions() + + assert len(sessions) == 5 + + def test_delete_session(self, temp_db, sample_data): + """Test deleting a session.""" + from cortex.monitor.storage import MonitorStorage + + storage = MonitorStorage(db_path=temp_db) + session_id = storage.create_session() + storage.save_samples(session_id, sample_data) + + result = storage.delete_session(session_id) + + assert result is True + assert storage.get_session(session_id) is None + assert len(storage.get_session_samples(session_id)) == 0 + + +def test_storage_list_sessions_limit(temp_db): + """Test listing sessions with limit.""" + from cortex.monitor.storage import MonitorStorage + + storage = MonitorStorage(db_path=temp_db) + + # Create 10 sessions + for i in range(10): + storage.create_session(mode=f"test_{i}") + + # List with limit + sessions = storage.list_sessions(limit=5) + + assert len(sessions) == 5 + + +def test_storage_get_nonexistent_session(temp_db): + """Test getting a non-existent session.""" + from cortex.monitor.storage import MonitorStorage + + storage = MonitorStorage(db_path=temp_db) + + session = storage.get_session("nonexistent-uuid") + + assert session is None + + +def test_storage_save_samples_invalid_session(temp_db, sample_data): + """Test saving samples to invalid session.""" + from cortex.monitor.storage import MonitorStorage + + storage = MonitorStorage(db_path=temp_db) + + # Try to save to non-existent session + count = storage.save_samples("invalid-uuid", sample_data) + + # Should return 0 or handle gracefully + assert count >= 0 + + +# ============================================================================= +# CLI Integration Tests +# ============================================================================= + + +class TestCLIIntegration: + """Tests for CLI command registration and routing.""" + + def test_monitor_command_registered(self): + """Test that monitor command is registered.""" + import argparse + + # Import just enough to check parser + from cortex.cli import main + + # We can't easily test the full parser without running main, + # but we can verify the command is in the help text + # This is a simple integration test + # Verified by importing cortex.cli without error - command is registered + assert main is not None # Validate import succeeded + + def test_monitor_args(self): + """Test monitor argument parsing.""" + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--duration", "-d", type=int) + parser.add_argument("--interval", "-i", type=float, default=1.0) + parser.add_argument("--export", "-e", type=str) + + args = parser.parse_args(["--duration", "10", "--interval", "0.5"]) + + assert args.duration == 10 + assert args.interval == pytest.approx(0.5) + + def test_install_monitor_flag(self): + """Test install --monitor flag parsing.""" + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("software") + parser.add_argument("--monitor", action="store_true") + + args = parser.parse_args(["nginx", "--monitor"]) + + assert args.software == "nginx" + assert args.monitor is True + + +# ============================================================================= +# Monitor UI Tests +# ============================================================================= +class TestMonitorUI: + """Tests for MonitorUI without running real UI loops.""" + + def test_monitor_ui_init(self): + from cortex.monitor.monitor_ui import MonitorUI + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler(interval=1.0) + ui = MonitorUI(sampler) + assert ui.sampler is sampler + assert ui._running is False + + +def test_create_bar(): + from cortex.monitor.monitor_ui import MonitorUI + from cortex.monitor.sampler import ResourceSampler + + ui = MonitorUI(ResourceSampler()) + bar = ui._create_bar("CPU", 75.0, cores=4) + text = str(bar) + assert "CPU" in text + assert "%" in text + + +def test_create_io_table(sample_data): + from cortex.monitor.monitor_ui import MonitorUI + from cortex.monitor.sampler import ResourceSampler + + ui = MonitorUI(ResourceSampler()) + table = ui._create_io_table(sample_data[0]) + assert table is not None + + +def test_monitor_ui_fallback(sample_data, monkeypatch): + """ + Ensure fallback mode does not loop infinitely. + """ + from unittest.mock import MagicMock + + from cortex.monitor.monitor_ui import MonitorUI + from cortex.monitor.sampler import PeakUsage, ResourceSampler + + sampler = ResourceSampler() + + # Mock sampler methods BEFORE creating UI + sampler.start = MagicMock() + sampler.stop = MagicMock() + sampler.get_latest_sample = MagicMock(return_value=sample_data[0]) + sampler.get_peak_usage = MagicMock(return_value=PeakUsage()) + + ui = MonitorUI(sampler) + + # Force fallback path + monkeypatch.setattr("cortex.monitor.monitor_ui.RICH_AVAILABLE", False) + + # Mock time.sleep to prevent actual delays and count calls + sleep_count = {"count": 0} + + def mock_sleep(duration): + sleep_count["count"] += 1 + # After first sleep, force exit + if sleep_count["count"] >= 1: + ui._running = False + + monkeypatch.setattr("time.sleep", mock_sleep) + + # Run with very short duration and force early exit + peak = ui._run_fallback(duration=0.01) + assert isinstance(peak, PeakUsage) + assert sleep_count["count"] >= 1 + + +def test_monitor_ui_run_minimal(monkeypatch): + """ + Run ui.run() without entering a real Live loop. + """ + from unittest.mock import MagicMock + + from cortex.monitor.monitor_ui import MonitorUI + from cortex.monitor.sampler import PeakUsage, ResourceSampler + + sampler = ResourceSampler() + sampler.start = MagicMock() + sampler.stop = MagicMock() + sampler.get_latest_sample = MagicMock(return_value=None) + sampler.get_peak_usage = MagicMock(return_value=PeakUsage()) + + ui = MonitorUI(sampler) + + # Mock Live context manager to prevent actual TUI + class MockLive: + def __init__(self, *args, **kwargs): + self.update_count = 0 + + def __enter__(self): + return self + + def __exit__(self, *args): + # Intentionally empty - MockLive context manager requires no cleanup + pass + + def update(self, content): + self.update_count += 1 + # Force exit after first update + if self.update_count >= 1: + ui._running = False + + monkeypatch.setattr("cortex.monitor.monitor_ui.Live", MockLive) + + # Mock time.sleep to prevent delays + monkeypatch.setattr("time.sleep", lambda _: None) + + # Run with very short duration + peak = ui.run(duration=0.01) + assert isinstance(peak, PeakUsage) + sampler.start.assert_called_once() + sampler.stop.assert_called_once() + + +def test_run_standalone_monitor(monkeypatch): + """ + Ensure run_standalone_monitor does not invoke real UI logic. + """ + from unittest.mock import MagicMock, patch + + from cortex.monitor.monitor_ui import run_standalone_monitor + from cortex.monitor.sampler import PeakUsage + + # Mock the entire MonitorUI class + mock_ui_instance = MagicMock() + mock_ui_instance.run.return_value = PeakUsage() + + mock_ui_class = MagicMock(return_value=mock_ui_instance) + + monkeypatch.setattr( + "cortex.monitor.monitor_ui.MonitorUI", + mock_ui_class, + ) + + result = run_standalone_monitor(duration=0.01, interval=1.0) + assert result == 0 + mock_ui_instance.run.assert_called_once() + + +def test_monitor_ui_render(sample_data, monkeypatch): + """Test the _render method.""" + from unittest.mock import MagicMock + + from cortex.monitor.monitor_ui import MonitorUI + from cortex.monitor.sampler import ResourceSampler + + # Only run if rich is available + try: + from rich.panel import Panel + except ImportError: + pytest.skip("Rich not available") + + sampler = ResourceSampler() + sampler.get_latest_sample = MagicMock(return_value=sample_data[0]) + sampler.check_alerts = MagicMock(return_value=[]) + + ui = MonitorUI(sampler) + ui._start_time = 1000.0 + + # Mock time to control elapsed time + monkeypatch.setattr("time.time", lambda: 1010.0) + + panel = ui._render() + assert panel is not None + assert isinstance(panel, Panel) + + +def test_monitor_ui_show_summary(sample_data, monkeypatch): + """Test the _show_summary method.""" + from unittest.mock import MagicMock + + from cortex.monitor.monitor_ui import MonitorUI + from cortex.monitor.sampler import PeakUsage, ResourceSampler + + # Only run if rich is available + try: + from rich.console import Console + except ImportError: + pytest.skip("Rich not available") + + sampler = ResourceSampler() + sampler.get_sample_count = MagicMock(return_value=100) + + mock_console = MagicMock() + ui = MonitorUI(sampler, console=mock_console) + ui._start_time = 1000.0 + + monkeypatch.setattr("time.time", lambda: 1030.0) + + peak = PeakUsage( + cpu_percent=85.5, + ram_percent=70.2, + ram_used_gb=11.3, + disk_write_rate_max=1024 * 1024 * 10, # 10 MB/s + net_recv_rate_max=1024 * 1024 * 5, # 5 MB/s + ) + + ui._show_summary(peak) + + # Verify console.print was called multiple times + assert mock_console.print.call_count >= 5 + + +def test_monitor_ui_stop(sample_data): + """Test the stop method.""" + from cortex.monitor.monitor_ui import MonitorUI + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler() + ui = MonitorUI(sampler) + + ui._running = True + ui.stop() + + assert ui._running is False + + +def test_monitor_ui_with_alerts(sample_data, monkeypatch): + """Test UI rendering with active alerts.""" + from unittest.mock import MagicMock + + from cortex.monitor.monitor_ui import MonitorUI + from cortex.monitor.sampler import ResourceSampler + + # Only run if rich is available + try: + from rich.panel import Panel + except ImportError: + pytest.skip("Rich not available") + + sampler = ResourceSampler() + + # Create a sample with high resource usage + high_usage_sample = sample_data[0] + high_usage_sample.cpu_percent = 95.0 + high_usage_sample.ram_percent = 92.0 + + sampler.get_latest_sample = MagicMock(return_value=high_usage_sample) + sampler.check_alerts = MagicMock(return_value=["āš ļø CRITICAL: CPU at 95%", "⚔ RAM high: 92%"]) + + ui = MonitorUI(sampler) + ui._start_time = 1000.0 + + monkeypatch.setattr("time.time", lambda: 1010.0) + + panel = ui._render() + assert panel is not None + + +def test_export_integration(sample_data, temp_export_dir, monkeypatch): + """Test export functionality in run_standalone_monitor.""" + import os + from unittest.mock import MagicMock + + from cortex.monitor.monitor_ui import run_standalone_monitor + from cortex.monitor.sampler import PeakUsage + + export_path = os.path.join(temp_export_dir, "test_export.json") + + # Mock the UI + mock_ui_instance = MagicMock() + mock_ui_instance.run.return_value = PeakUsage() + + mock_ui_class = MagicMock(return_value=mock_ui_instance) + monkeypatch.setattr("cortex.monitor.monitor_ui.MonitorUI", mock_ui_class) + + # Mock the sampler to return our sample data + mock_sampler = MagicMock() + mock_sampler.get_samples.return_value = sample_data + + def mock_ui_init(sampler, **kwargs): + instance = MagicMock() + instance.sampler = mock_sampler + instance.run.return_value = PeakUsage() + return instance + + monkeypatch.setattr("cortex.monitor.monitor_ui.MonitorUI", mock_ui_init) + + result = run_standalone_monitor(duration=0.01, export_path=export_path) + + # Should succeed + assert result == 0 + # Export file should be created + assert os.path.exists(export_path) + + +def test_create_bar_colors(): + """Test bar color logic for different thresholds.""" + from cortex.monitor.monitor_ui import MonitorUI + from cortex.monitor.sampler import ResourceSampler + + ui = MonitorUI(ResourceSampler()) + + # Low usage - green + bar_low = ui._create_bar("CPU", 50.0) + assert "50.0%" in str(bar_low) + + # Warning - yellow + bar_warn = ui._create_bar("CPU", 75.0) + assert "75.0%" in str(bar_warn) + + # Critical - red + bar_crit = ui._create_bar("CPU", 95.0) + assert "95.0%" in str(bar_crit) + + +# ============================================================================= +# Sampler Internal Behavior Tests +# ============================================================================= + + +class TestResourceSamplerInternals: + """Tests for internal sampler behavior and edge cases.""" + + def test_start_stop_idempotent(self, monkeypatch): + """Starting or stopping multiple times should be safe.""" + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler(interval=1.0) + + monkeypatch.setattr("cortex.monitor.sampler.PSUTIL_AVAILABLE", True) + + sampler.start() + sampler.start() # second call should not crash + + assert sampler.is_running is True + + sampler.stop() + sampler.stop() # second call should not crash + + assert sampler.is_running is False + + def test_update_peak_usage(self, sample_data): + """Peak usage should track max values across samples.""" + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler() + + for sample in sample_data: + sampler._update_peak(sample) + + peak = sampler.get_peak_usage() + + assert peak.cpu_percent > 0 + assert peak.ram_used_gb > 0 + assert peak.disk_read_rate_max > 0 + assert peak.net_recv_rate_max > 0 + + def test_check_alerts_critical_priority(self): + """Critical alerts should appear when thresholds exceeded.""" + from cortex.monitor.sampler import ResourceSample, ResourceSampler + + sampler = ResourceSampler() + + sample = ResourceSample( + timestamp=0.0, + cpu_percent=97.0, # critical + cpu_count=4, + ram_used_gb=10.0, + ram_total_gb=16.0, + ram_percent=60.0, + disk_used_gb=100.0, + disk_total_gb=500.0, + disk_percent=20.0, + ) + + alerts = sampler.check_alerts(sample) + + assert any("CRITICAL" in alert for alert in alerts) + + def test_get_sample_count(self): + """Sample count should reflect collected samples.""" + from cortex.monitor.sampler import ResourceSampler + + sampler = ResourceSampler() + sampler._samples = [1, 2, 3] # direct injection + + assert sampler.get_sample_count() == 3 + + +class TestSamplerInternalBehavior: + """Tests for internal ResourceSampler behavior and edge cases.""" + + def test_sampler_start_idempotent(self, monkeypatch): + """Calling start() twice should not spawn multiple threads.""" + from cortex.monitor.sampler import ResourceSampler + + monkeypatch.setattr("cortex.monitor.sampler.PSUTIL_AVAILABLE", True) + + sampler = ResourceSampler() + + # Mock thread creation + start_calls = [] + + def fake_thread_start(): + start_calls.append(1) + + monkeypatch.setattr( + sampler, + "_sample_loop", + lambda: None, + ) + + # Patch Thread.start + monkeypatch.setattr( + "threading.Thread.start", + lambda self: fake_thread_start(), + ) + + sampler.start() + sampler.start() # second call should be ignored + + assert sampler.is_running is True + assert len(start_calls) == 1 + + def test_sampler_resets_state_on_restart(self, monkeypatch): + """Sampler should clear samples and peak data when restarted.""" + from cortex.monitor.sampler import PeakUsage, ResourceSample, ResourceSampler + + monkeypatch.setattr("cortex.monitor.sampler.PSUTIL_AVAILABLE", True) + + sampler = ResourceSampler() + + # Seed internal state + sampler._samples = [ + ResourceSample( + timestamp=1.0, + cpu_percent=90.0, + cpu_count=4, + ram_used_gb=8.0, + ram_total_gb=16.0, + ram_percent=50.0, + disk_used_gb=100.0, + disk_total_gb=500.0, + disk_percent=20.0, + ) + ] + sampler._peak = PeakUsage(cpu_percent=90.0, ram_percent=80.0) + + # Prevent thread from running + monkeypatch.setattr("threading.Thread.start", lambda self: None) + + sampler.start() + + assert sampler.get_samples() == [] + peak = sampler.get_peak_usage() + assert peak.cpu_percent == pytest.approx(0.0) + assert peak.ram_percent == pytest.approx(0.0)