Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions src/cache/cache_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import threading
import time
import logging
from typing import Dict, Any, Optional

Expand All @@ -28,7 +29,12 @@ def __init__(self, logger: Optional[logging.Logger] = None) -> None:
'background_hits': 0,
'background_misses': 0,
'total_fetch_time': 0.0,
'fetch_count': 0
'fetch_count': 0,
# Disk cleanup metrics
'last_disk_cleanup': 0.0,
'total_files_cleaned': 0,
'total_space_freed_mb': 0.0,
'last_cleanup_duration_sec': 0.0
}

def record_hit(self, cache_type: str = 'regular') -> None:
Expand Down Expand Up @@ -69,6 +75,21 @@ def record_fetch_time(self, duration: float) -> None:
self._metrics['total_fetch_time'] += duration
self._metrics['fetch_count'] += 1

def record_disk_cleanup(self, files_cleaned: int, space_freed_mb: float, duration_sec: float) -> None:
"""
Record disk cleanup operation results.

Args:
files_cleaned: Number of files deleted
space_freed_mb: Space freed in megabytes
duration_sec: Duration of cleanup operation in seconds
"""
with self._lock:
self._metrics['last_disk_cleanup'] = time.time()
self._metrics['total_files_cleaned'] += files_cleaned
self._metrics['total_space_freed_mb'] += space_freed_mb
self._metrics['last_cleanup_duration_sec'] = duration_sec

def get_metrics(self) -> Dict[str, Any]:
"""
Get current cache performance metrics.
Expand All @@ -93,7 +114,12 @@ def get_metrics(self) -> Dict[str, Any]:
'api_calls_saved': self._metrics['api_calls_saved'],
'average_fetch_time': avg_fetch_time,
'total_fetch_time': self._metrics['total_fetch_time'],
'fetch_count': self._metrics['fetch_count']
'fetch_count': self._metrics['fetch_count'],
# Disk cleanup metrics
'last_disk_cleanup': self._metrics['last_disk_cleanup'],
'total_files_cleaned': self._metrics['total_files_cleaned'],
'total_space_freed_mb': self._metrics['total_space_freed_mb'],
'last_cleanup_duration_sec': self._metrics['last_cleanup_duration_sec']
}

def log_metrics(self) -> None:
Expand Down
130 changes: 129 additions & 1 deletion src/cache/disk_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,28 @@
import tempfile
import logging
import threading
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, Protocol
from datetime import datetime

from src.exceptions import CacheError


class CacheStrategyProtocol(Protocol):
"""Protocol for cache strategy objects that categorize cache keys."""

def get_data_type_from_key(self, key: str) -> str:
"""
Determine the data type from a cache key.

Args:
key: Cache key

Returns:
Data type string for strategy lookup
"""
...


class DateTimeEncoder(json.JSONEncoder):
"""JSON encoder that handles datetime objects."""
def default(self, obj: Any) -> Any:
Expand Down Expand Up @@ -269,4 +285,116 @@ def clear(self, key: Optional[str] = None) -> None:
def get_cache_dir(self) -> Optional[str]:
"""Get the cache directory path."""
return self.cache_dir

def cleanup_expired_files(self, cache_strategy: CacheStrategyProtocol, retention_policies: Dict[str, int]) -> Dict[str, Any]:
"""
Clean up expired cache files based on retention policies.

Args:
cache_strategy: Object implementing CacheStrategyProtocol for categorizing files
retention_policies: Dict mapping data types to retention days

Returns:
Dictionary with cleanup statistics:
- files_scanned: Total files checked
- files_deleted: Files removed
- space_freed_bytes: Bytes freed
- errors: Number of errors encountered
"""
if not self.cache_dir or not os.path.exists(self.cache_dir):
self.logger.warning("Cache directory not available for cleanup")
return {'files_scanned': 0, 'files_deleted': 0, 'space_freed_bytes': 0, 'errors': 0}

stats = {
'files_scanned': 0,
'files_deleted': 0,
'space_freed_bytes': 0,
'errors': 0
}

current_time = time.time()

try:
# Collect files to process outside the lock to avoid blocking cache operations
# Only hold lock during directory listing to get snapshot of files
try:
with self._lock:
# Get snapshot of files while holding lock briefly
filenames = [f for f in os.listdir(self.cache_dir) if f.endswith('.json')]
except OSError as list_error:
self.logger.error("Error listing cache directory %s: %s", self.cache_dir, list_error, exc_info=True)
stats['errors'] += 1
return stats

# Process files outside the lock to avoid blocking get/set operations
for filename in filenames:
stats['files_scanned'] += 1
file_path = os.path.join(self.cache_dir, filename)

try:
# Get file age (outside lock - stat operations are generally atomic)
file_mtime = os.path.getmtime(file_path)
file_age_days = (current_time - file_mtime) / 86400 # Convert to days

# Extract cache key from filename (remove .json extension)
cache_key = filename[:-5]

# Determine data type and retention policy
data_type = cache_strategy.get_data_type_from_key(cache_key)
retention_days = retention_policies.get(data_type, retention_policies.get('default', 30))

# Delete if older than retention period
# Only hold lock during actual file deletion to ensure atomicity
if file_age_days > retention_days:
try:
# Hold lock only during delete operation (get size and remove atomically)
with self._lock:
# Double-check file still exists (may have been deleted by another process)
if os.path.exists(file_path):
try:
file_size = os.path.getsize(file_path)
os.remove(file_path)
# Only increment stats if removal succeeded
stats['files_deleted'] += 1
stats['space_freed_bytes'] += file_size
self.logger.debug(
"Deleted expired cache file: %s (age: %.1f days, type: %s, retention: %d days)",
filename, file_age_days, data_type, retention_days
)
except FileNotFoundError:
# File was deleted by another process between exists check and remove
# This is a benign race condition, silently continue
pass
else:
# File was deleted by another process before lock was acquired
# This is a benign race condition, silently continue
pass
except FileNotFoundError:
# File was already deleted by another process, skip it
# This is a benign race condition, silently continue
continue
except OSError as e:
# Other file system errors, log but don't fail the entire cleanup
stats['errors'] += 1
self.logger.warning("Error deleting cache file %s: %s", filename, e)
continue

except FileNotFoundError:
# File was deleted by another process between listing and processing
# This is a benign race condition, silently continue
continue
except OSError as e:
stats['errors'] += 1
self.logger.warning("Error processing cache file %s: %s", filename, e)
continue
except Exception as e:
stats['errors'] += 1
self.logger.error("Unexpected error processing cache file %s: %s", filename, e, exc_info=True)
continue

except OSError as e:
self.logger.error("Error listing cache directory %s: %s", self.cache_dir, e, exc_info=True)
stats['errors'] += 1

return stats

Loading