diff --git a/src/cache/cache_metrics.py b/src/cache/cache_metrics.py index 3b39543dc..6abd7084a 100644 --- a/src/cache/cache_metrics.py +++ b/src/cache/cache_metrics.py @@ -5,6 +5,7 @@ """ import threading +import time import logging from typing import Dict, Any, Optional @@ -28,7 +29,12 @@ def __init__(self, logger: Optional[logging.Logger] = None) -> None: 'background_hits': 0, 'background_misses': 0, 'total_fetch_time': 0.0, - 'fetch_count': 0 + 'fetch_count': 0, + # Disk cleanup metrics + 'last_disk_cleanup': 0.0, + 'total_files_cleaned': 0, + 'total_space_freed_mb': 0.0, + 'last_cleanup_duration_sec': 0.0 } def record_hit(self, cache_type: str = 'regular') -> None: @@ -69,6 +75,21 @@ def record_fetch_time(self, duration: float) -> None: self._metrics['total_fetch_time'] += duration self._metrics['fetch_count'] += 1 + def record_disk_cleanup(self, files_cleaned: int, space_freed_mb: float, duration_sec: float) -> None: + """ + Record disk cleanup operation results. + + Args: + files_cleaned: Number of files deleted + space_freed_mb: Space freed in megabytes + duration_sec: Duration of cleanup operation in seconds + """ + with self._lock: + self._metrics['last_disk_cleanup'] = time.time() + self._metrics['total_files_cleaned'] += files_cleaned + self._metrics['total_space_freed_mb'] += space_freed_mb + self._metrics['last_cleanup_duration_sec'] = duration_sec + def get_metrics(self) -> Dict[str, Any]: """ Get current cache performance metrics. @@ -93,7 +114,12 @@ def get_metrics(self) -> Dict[str, Any]: 'api_calls_saved': self._metrics['api_calls_saved'], 'average_fetch_time': avg_fetch_time, 'total_fetch_time': self._metrics['total_fetch_time'], - 'fetch_count': self._metrics['fetch_count'] + 'fetch_count': self._metrics['fetch_count'], + # Disk cleanup metrics + 'last_disk_cleanup': self._metrics['last_disk_cleanup'], + 'total_files_cleaned': self._metrics['total_files_cleaned'], + 'total_space_freed_mb': self._metrics['total_space_freed_mb'], + 'last_cleanup_duration_sec': self._metrics['last_cleanup_duration_sec'] } def log_metrics(self) -> None: diff --git a/src/cache/disk_cache.py b/src/cache/disk_cache.py index 69f8ab0a0..037688421 100644 --- a/src/cache/disk_cache.py +++ b/src/cache/disk_cache.py @@ -10,12 +10,28 @@ import tempfile import logging import threading -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, Protocol from datetime import datetime from src.exceptions import CacheError +class CacheStrategyProtocol(Protocol): + """Protocol for cache strategy objects that categorize cache keys.""" + + def get_data_type_from_key(self, key: str) -> str: + """ + Determine the data type from a cache key. + + Args: + key: Cache key + + Returns: + Data type string for strategy lookup + """ + ... + + class DateTimeEncoder(json.JSONEncoder): """JSON encoder that handles datetime objects.""" def default(self, obj: Any) -> Any: @@ -269,4 +285,116 @@ def clear(self, key: Optional[str] = None) -> None: def get_cache_dir(self) -> Optional[str]: """Get the cache directory path.""" return self.cache_dir + + def cleanup_expired_files(self, cache_strategy: CacheStrategyProtocol, retention_policies: Dict[str, int]) -> Dict[str, Any]: + """ + Clean up expired cache files based on retention policies. + + Args: + cache_strategy: Object implementing CacheStrategyProtocol for categorizing files + retention_policies: Dict mapping data types to retention days + + Returns: + Dictionary with cleanup statistics: + - files_scanned: Total files checked + - files_deleted: Files removed + - space_freed_bytes: Bytes freed + - errors: Number of errors encountered + """ + if not self.cache_dir or not os.path.exists(self.cache_dir): + self.logger.warning("Cache directory not available for cleanup") + return {'files_scanned': 0, 'files_deleted': 0, 'space_freed_bytes': 0, 'errors': 0} + + stats = { + 'files_scanned': 0, + 'files_deleted': 0, + 'space_freed_bytes': 0, + 'errors': 0 + } + + current_time = time.time() + + try: + # Collect files to process outside the lock to avoid blocking cache operations + # Only hold lock during directory listing to get snapshot of files + try: + with self._lock: + # Get snapshot of files while holding lock briefly + filenames = [f for f in os.listdir(self.cache_dir) if f.endswith('.json')] + except OSError as list_error: + self.logger.error("Error listing cache directory %s: %s", self.cache_dir, list_error, exc_info=True) + stats['errors'] += 1 + return stats + + # Process files outside the lock to avoid blocking get/set operations + for filename in filenames: + stats['files_scanned'] += 1 + file_path = os.path.join(self.cache_dir, filename) + + try: + # Get file age (outside lock - stat operations are generally atomic) + file_mtime = os.path.getmtime(file_path) + file_age_days = (current_time - file_mtime) / 86400 # Convert to days + + # Extract cache key from filename (remove .json extension) + cache_key = filename[:-5] + + # Determine data type and retention policy + data_type = cache_strategy.get_data_type_from_key(cache_key) + retention_days = retention_policies.get(data_type, retention_policies.get('default', 30)) + + # Delete if older than retention period + # Only hold lock during actual file deletion to ensure atomicity + if file_age_days > retention_days: + try: + # Hold lock only during delete operation (get size and remove atomically) + with self._lock: + # Double-check file still exists (may have been deleted by another process) + if os.path.exists(file_path): + try: + file_size = os.path.getsize(file_path) + os.remove(file_path) + # Only increment stats if removal succeeded + stats['files_deleted'] += 1 + stats['space_freed_bytes'] += file_size + self.logger.debug( + "Deleted expired cache file: %s (age: %.1f days, type: %s, retention: %d days)", + filename, file_age_days, data_type, retention_days + ) + except FileNotFoundError: + # File was deleted by another process between exists check and remove + # This is a benign race condition, silently continue + pass + else: + # File was deleted by another process before lock was acquired + # This is a benign race condition, silently continue + pass + except FileNotFoundError: + # File was already deleted by another process, skip it + # This is a benign race condition, silently continue + continue + except OSError as e: + # Other file system errors, log but don't fail the entire cleanup + stats['errors'] += 1 + self.logger.warning("Error deleting cache file %s: %s", filename, e) + continue + + except FileNotFoundError: + # File was deleted by another process between listing and processing + # This is a benign race condition, silently continue + continue + except OSError as e: + stats['errors'] += 1 + self.logger.warning("Error processing cache file %s: %s", filename, e) + continue + except Exception as e: + stats['errors'] += 1 + self.logger.error("Unexpected error processing cache file %s: %s", filename, e, exc_info=True) + continue + + except OSError as e: + self.logger.error("Error listing cache directory %s: %s", self.cache_dir, e, exc_info=True) + stats['errors'] += 1 + + return stats diff --git a/src/cache_manager.py b/src/cache_manager.py index 92c8e86eb..9b1335fdf 100644 --- a/src/cache_manager.py +++ b/src/cache_manager.py @@ -59,6 +59,31 @@ def __init__(self) -> None: self._max_memory_cache_size = self._memory_cache_component._max_size self._memory_cache_cleanup_interval = self._memory_cache_component._cleanup_interval self._last_memory_cache_cleanup = self._memory_cache_component._last_cleanup + + # Disk cleanup configuration + self._disk_cleanup_interval_hours = 24 # Run cleanup every 24 hours + self._disk_cleanup_interval = 3600.0 # Minimum interval between cleanups (1 hour) for throttle + self._last_disk_cleanup = 0.0 # Timestamp of last disk cleanup + self._cleanup_thread: Optional[threading.Thread] = None + self._cleanup_stop_event = threading.Event() # Event to signal thread shutdown + self._retention_policies = { + 'odds': 2, # Odds data: 2 days (lines move frequently) + 'odds_live': 2, # Live odds: 2 days + 'sports_live': 7, # Live sports: 7 days + 'weather_current': 7, # Current weather: 7 days + 'sports_recent': 7, # Recent games: 7 days + 'news': 14, # News: 14 days + 'sports_upcoming': 60, # Upcoming games: 60 days (schedules stable) + 'sports_schedules': 60, # Schedules: 60 days + 'team_info': 60, # Team info: 60 days + 'stocks': 14, # Stock data: 14 days + 'crypto': 14, # Crypto data: 14 days + 'default': 30 # Default: 30 days + } + + # Start background cleanup thread only if disk caching is enabled + if self.cache_dir: + self.start_cleanup_thread() def _get_writable_cache_dir(self) -> Optional[str]: """Tries to find or create a writable cache directory, preferring a system path when available.""" @@ -555,7 +580,150 @@ def setup_persistent_cache(self) -> bool: except (OSError, IOError, PermissionError) as e: self.logger.error(f"Failed to set up persistent cache directory {cache_dir}: {e}", exc_info=True) - return False + return False + + def cleanup_disk_cache(self, force: bool = False) -> Dict[str, Any]: + """ + Clean up expired disk cache files based on retention policies. + + Args: + force: If True, run cleanup regardless of last cleanup time + + Returns: + Dictionary with cleanup statistics + """ + now = time.time() + + # Check if cleanup is needed (throttle to prevent too-frequent cleanups) + if not force and (now - self._last_disk_cleanup) < self._disk_cleanup_interval: + return { + 'files_scanned': 0, + 'files_deleted': 0, + 'space_freed_mb': 0.0, + 'errors': 0, + 'duration_sec': 0.0 + } + + start_time = time.time() + + try: + # Perform cleanup + stats = self._disk_cache_component.cleanup_expired_files( + cache_strategy=self._strategy_component, + retention_policies=self._retention_policies + ) + + duration = time.time() - start_time + space_freed_mb = stats['space_freed_bytes'] / (1024 * 1024) + + # Record metrics + self._metrics_component.record_disk_cleanup( + files_cleaned=stats['files_deleted'], + space_freed_mb=space_freed_mb, + duration_sec=duration + ) + + # Log summary + if stats['files_deleted'] > 0: + self.logger.info( + "Disk cache cleanup completed: %d/%d files deleted, %.2f MB freed, %d errors, took %.2fs", + stats['files_deleted'], stats['files_scanned'], space_freed_mb, + stats['errors'], duration + ) + else: + self.logger.debug( + "Disk cache cleanup completed: no files to delete (%d files scanned)", + stats['files_scanned'] + ) + + # Update last cleanup time + self._last_disk_cleanup = time.time() + + return { + 'files_scanned': stats['files_scanned'], + 'files_deleted': stats['files_deleted'], + 'space_freed_mb': space_freed_mb, + 'errors': stats['errors'], + 'duration_sec': duration + } + + except Exception as e: + self.logger.error("Error during disk cache cleanup: %s", e, exc_info=True) + return { + 'files_scanned': 0, + 'files_deleted': 0, + 'space_freed_mb': 0.0, + 'errors': 1, + 'duration_sec': time.time() - start_time + } + + def start_cleanup_thread(self) -> None: + """Start background thread for periodic disk cache cleanup.""" + if self._cleanup_thread and self._cleanup_thread.is_alive(): + self.logger.debug("Cleanup thread already running") + return + + def cleanup_loop(): + """Background loop that runs cleanup periodically.""" + self.logger.info("Disk cache cleanup thread started (interval: %d hours)", + self._disk_cleanup_interval_hours) + + # Run initial cleanup on startup (deferred from __init__ to avoid blocking) + try: + self.logger.debug("Running initial disk cache cleanup") + self.cleanup_disk_cache() + except Exception as e: + self.logger.error("Error in initial cleanup: %s", e, exc_info=True) + + # Main cleanup loop + while not self._cleanup_stop_event.is_set(): + try: + # Sleep for the configured interval (interruptible) + sleep_seconds = self._disk_cleanup_interval_hours * 3600 + if self._cleanup_stop_event.wait(timeout=sleep_seconds): + # Event was set, exit loop + break + + # Run cleanup if not stopped + if not self._cleanup_stop_event.is_set(): + self.logger.debug("Running scheduled disk cache cleanup") + self.cleanup_disk_cache() + + except Exception as e: + self.logger.error("Error in cleanup thread: %s", e, exc_info=True) + # Continue running despite errors, but use interruptible sleep + if self._cleanup_stop_event.wait(timeout=60): + # Event was set during error recovery sleep, exit loop + break + + self.logger.info("Disk cache cleanup thread stopped") + + self._cleanup_stop_event.clear() # Reset event before starting thread + self._cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True, name="DiskCacheCleanup") + self._cleanup_thread.start() + self.logger.info("Started disk cache cleanup background thread") + + def stop_cleanup_thread(self) -> None: + """ + Stop the background cleanup thread gracefully. + + Signals the thread to stop and waits for it to finish (with timeout). + This allows for clean shutdown during testing or application termination. + """ + if not self._cleanup_thread or not self._cleanup_thread.is_alive(): + self.logger.debug("Cleanup thread not running") + return + + self.logger.info("Stopping disk cache cleanup thread...") + self._cleanup_stop_event.set() # Signal thread to stop + + # Wait for thread to finish (with timeout to avoid hanging) + self._cleanup_thread.join(timeout=5.0) + + if self._cleanup_thread.is_alive(): + self.logger.warning("Cleanup thread did not stop within timeout, thread may still be running") + else: + self.logger.info("Disk cache cleanup thread stopped successfully") def get_sport_live_interval(self, sport_key: str) -> int: """ @@ -685,56 +853,23 @@ def generate_sport_cache_key(self, sport: str, date_str: Optional[str] = None) - def record_cache_hit(self, cache_type: str = 'regular') -> None: """Record a cache hit for performance monitoring.""" - with self._cache_lock: - if cache_type == 'background': - self._cache_metrics['background_hits'] += 1 - else: - self._cache_metrics['hits'] += 1 + self._metrics_component.record_hit(cache_type) def record_cache_miss(self, cache_type: str = 'regular') -> None: """Record a cache miss for performance monitoring.""" - with self._cache_lock: - if cache_type == 'background': - self._cache_metrics['background_misses'] += 1 - else: - self._cache_metrics['misses'] += 1 - self._cache_metrics['api_calls_saved'] += 1 + self._metrics_component.record_miss(cache_type) def record_fetch_time(self, duration: float) -> None: """Record fetch operation duration for performance monitoring.""" - with self._cache_lock: - self._cache_metrics['total_fetch_time'] += duration - self._cache_metrics['fetch_count'] += 1 + self._metrics_component.record_fetch_time(duration) def get_cache_metrics(self) -> Dict[str, Any]: """Get current cache performance metrics.""" - with self._cache_lock: - total_hits = self._cache_metrics['hits'] + self._cache_metrics['background_hits'] - total_misses = self._cache_metrics['misses'] + self._cache_metrics['background_misses'] - total_requests = total_hits + total_misses - - avg_fetch_time = (self._cache_metrics['total_fetch_time'] / - self._cache_metrics['fetch_count']) if self._cache_metrics['fetch_count'] > 0 else 0.0 - - return { - 'total_requests': total_requests, - 'cache_hit_rate': total_hits / total_requests if total_requests > 0 else 0.0, - 'background_hit_rate': (self._cache_metrics['background_hits'] / - (self._cache_metrics['background_hits'] + self._cache_metrics['background_misses']) - if (self._cache_metrics['background_hits'] + self._cache_metrics['background_misses']) > 0 else 0.0), - 'api_calls_saved': self._cache_metrics['api_calls_saved'], - 'average_fetch_time': avg_fetch_time, - 'total_fetch_time': self._cache_metrics['total_fetch_time'], - 'fetch_count': self._cache_metrics['fetch_count'] - } + return self._metrics_component.get_metrics() def log_cache_metrics(self) -> None: """Log current cache performance metrics.""" - metrics = self.get_cache_metrics() - self.logger.info(f"Cache Performance - Hit Rate: {metrics['cache_hit_rate']:.2%}, " - f"Background Hit Rate: {metrics['background_hit_rate']:.2%}, " - f"API Calls Saved: {metrics['api_calls_saved']}, " - f"Avg Fetch Time: {metrics['average_fetch_time']:.2f}s") + self._metrics_component.log_metrics() def get_memory_cache_stats(self) -> Dict[str, Any]: """