Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 49 additions & 30 deletions backend/app/services/analytics_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import hashlib
import json
import logging
import time
from typing import Any, Callable, Optional, TypeVar

import redis
Expand All @@ -21,51 +22,59 @@

# Module-level Redis client singleton
_redis_client: Optional[redis.Redis] = None
_redis_available: Optional[bool] = None
_redis_unavailable_until: float = 0
_RETRY_BACKOFF_SECONDS = 30


def get_redis_client() -> Optional[redis.Redis]:
"""
Get or create a Redis client.

Returns None if Redis is not configured or unavailable.
Uses lazy initialization and caches the result.
Uses lazy initialization with time-based retry on failure.
"""
global _redis_client, _redis_available
global _redis_client, _redis_unavailable_until

# If we've already determined Redis is unavailable, don't retry
if _redis_available is False:
# Check if Redis is configured
if not settings.redis_url:
logger.debug("Redis URL not configured, caching disabled")
return None

# If recently failed, skip until backoff expires
if _redis_unavailable_until > time.monotonic():
return None

# If we already have a client, return it
if _redis_client is not None:
return _redis_client

# Check if Redis is configured
if not settings.redis_url:
logger.debug("Redis URL not configured, caching disabled")
_redis_available = False
return None

try:
_redis_client = redis.from_url(
settings.redis_url,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2,
health_check_interval=15,
retry_on_timeout=False,
)
# Test connection
_redis_client.ping()
_redis_available = True
logger.info("Redis connection established for analytics caching")
_redis_unavailable_until = 0
logger.info("Redis connection established for caching")
return _redis_client
except Exception as e:
logger.warning(f"Redis connection failed, caching disabled: {e}")
_redis_available = False
logger.warning(f"Redis connection failed, will retry in {_RETRY_BACKOFF_SECONDS}s: {e}")
_redis_unavailable_until = time.monotonic() + _RETRY_BACKOFF_SECONDS
_redis_client = None
return None


def _mark_redis_unavailable() -> None:
"""Mark Redis as temporarily unavailable so we don't retry on every request."""
global _redis_client, _redis_unavailable_until
_redis_client = None
_redis_unavailable_until = time.monotonic() + _RETRY_BACKOFF_SECONDS


def _generate_cache_key(prefix: str, *args: Any, **kwargs: Any) -> str:
"""
Generate a cache key from prefix and arguments.
Expand Down Expand Up @@ -126,24 +135,30 @@ def wrapper(*args: Any, **kwargs: Any) -> T:
if cached is not None:
logger.debug(f"Cache hit for {cache_key}")
return json.loads(cached)
except (redis.ConnectionError, redis.TimeoutError) as e:
logger.warning(f"Redis connection lost during cache read: {e}")
_mark_redis_unavailable()
except Exception as e:
logger.warning(f"Cache read failed for {cache_key}: {e}")

# Execute function
result = func(*args, **kwargs)

try:
# Store in cache
# Convert Pydantic models to dict for JSON serialization
if hasattr(result, "model_dump"):
cache_value = json.dumps(result.model_dump(mode="json"))
else:
cache_value = json.dumps(result)

client.setex(cache_key, ttl_seconds, cache_value)
logger.debug(f"Cache set for {cache_key} (TTL: {ttl_seconds}s)")
except Exception as e:
logger.warning(f"Cache write failed for {cache_key}: {e}")
write_client = get_redis_client()
if write_client is not None:
try:
if hasattr(result, "model_dump"):
cache_value = json.dumps(result.model_dump(mode="json"))
else:
cache_value = json.dumps(result)

write_client.setex(cache_key, ttl_seconds, cache_value)
logger.debug(f"Cache set for {cache_key} (TTL: {ttl_seconds}s)")
except (redis.ConnectionError, redis.TimeoutError) as e:
logger.warning(f"Redis connection lost during cache write: {e}")
_mark_redis_unavailable()
except Exception as e:
logger.warning(f"Cache write failed for {cache_key}: {e}")

return result

Expand Down Expand Up @@ -181,6 +196,10 @@ def invalidate_analytics_cache(org_id: Optional[str] = None) -> int:
logger.info(f"Invalidated {deleted} analytics cache entries")
return deleted
return 0
except (redis.ConnectionError, redis.TimeoutError) as e:
logger.warning(f"Redis connection lost during cache invalidation: {e}")
_mark_redis_unavailable()
return 0
except Exception as e:
logger.warning(f"Cache invalidation failed: {e}")
return 0
Expand All @@ -192,6 +211,6 @@ def reset_redis_connection() -> None:

Useful for testing or when Redis becomes available after being down.
"""
global _redis_client, _redis_available
global _redis_client, _redis_unavailable_until
_redis_client = None
_redis_available = None
_redis_unavailable_until = 0
Loading