125 lines
4.4 KiB
Python
125 lines
4.4 KiB
Python
"""Services package."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import logging
|
|
from typing import Any, Optional, Dict
|
|
|
|
try:
|
|
import redis # type: ignore
|
|
except Exception: # pragma: no cover
|
|
redis = None # type: ignore
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RedisCache:
|
|
"""Lightweight Redis cache wrapper with graceful fallback."""
|
|
|
|
def __init__(self, url_env: str = "REDIS_URL", default_ttl_seconds: Optional[int] = None) -> None:
|
|
# Allow TTL to be configurable via env var (default 300s = 5 min, or 86400 = 24h)
|
|
ttl_env = os.getenv("REDIS_CACHE_TTL_SECONDS")
|
|
if default_ttl_seconds is None:
|
|
default_ttl_seconds = int(ttl_env) if ttl_env else 300
|
|
|
|
self.default_ttl_seconds = default_ttl_seconds
|
|
self._enabled = False
|
|
self._client = None
|
|
self._stats = {"hits": 0, "misses": 0, "sets": 0}
|
|
|
|
url = os.getenv(url_env)
|
|
if not url or redis is None:
|
|
logger.warning("Redis not configured or client unavailable; caching disabled")
|
|
return
|
|
try:
|
|
self._client = redis.Redis.from_url(url, decode_responses=True)
|
|
self._client.ping()
|
|
self._enabled = True
|
|
logger.info(f"Redis cache connected (TTL: {self.default_ttl_seconds}s)")
|
|
except Exception as exc:
|
|
logger.warning(f"Redis connection failed: {exc}; caching disabled")
|
|
self._enabled = False
|
|
self._client = None
|
|
|
|
@property
|
|
def enabled(self) -> bool:
|
|
return self._enabled and self._client is not None
|
|
|
|
def get_json(self, key: str) -> Optional[Any]:
|
|
if not self.enabled:
|
|
self._stats["misses"] += 1
|
|
return None
|
|
try:
|
|
raw = self._client.get(key) # type: ignore[union-attr]
|
|
if raw:
|
|
self._stats["hits"] += 1
|
|
return json.loads(raw)
|
|
else:
|
|
self._stats["misses"] += 1
|
|
return None
|
|
except Exception as exc:
|
|
logger.debug(f"Redis get_json error for key={key}: {exc}")
|
|
self._stats["misses"] += 1
|
|
return None
|
|
|
|
def set_json(self, key: str, value: Any, ttl_seconds: Optional[int] = None) -> None:
|
|
if not self.enabled:
|
|
return
|
|
try:
|
|
payload = json.dumps(value, default=lambda o: getattr(o, "model_dump", lambda: o)())
|
|
ttl = ttl_seconds if ttl_seconds is not None else self.default_ttl_seconds
|
|
# Use -1 for no expiration, otherwise use setex
|
|
if ttl > 0:
|
|
self._client.setex(key, ttl, payload) # type: ignore[union-attr]
|
|
else:
|
|
self._client.set(key, payload) # type: ignore[union-attr]
|
|
self._stats["sets"] += 1
|
|
except Exception as exc:
|
|
logger.debug(f"Redis set_json error for key={key}: {exc}")
|
|
|
|
def delete(self, pattern: str) -> int:
|
|
"""Delete keys matching pattern (e.g., 'routes:*'). Returns count deleted."""
|
|
if not self.enabled:
|
|
return 0
|
|
try:
|
|
keys = list(self._client.scan_iter(match=pattern)) # type: ignore[union-attr]
|
|
if keys:
|
|
return self._client.delete(*keys) # type: ignore[union-attr]
|
|
return 0
|
|
except Exception as exc:
|
|
logger.error(f"Redis delete error for pattern={pattern}: {exc}")
|
|
return 0
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get cache statistics."""
|
|
stats = self._stats.copy()
|
|
if self.enabled:
|
|
try:
|
|
# Count cache keys
|
|
route_keys = list(self._client.scan_iter(match="routes:*")) # type: ignore[union-attr]
|
|
stats["total_keys"] = len(route_keys)
|
|
stats["enabled"] = True
|
|
except Exception:
|
|
stats["total_keys"] = 0
|
|
stats["enabled"] = True
|
|
else:
|
|
stats["total_keys"] = 0
|
|
stats["enabled"] = False
|
|
return stats
|
|
|
|
def get_keys(self, pattern: str = "routes:*") -> list[str]:
|
|
"""Get list of cache keys matching pattern."""
|
|
if not self.enabled:
|
|
return []
|
|
try:
|
|
return list(self._client.scan_iter(match=pattern)) # type: ignore[union-attr]
|
|
except Exception as exc:
|
|
logger.error(f"Redis get_keys error for pattern={pattern}: {exc}")
|
|
return []
|
|
|
|
|
|
# Singleton cache instance for app
|
|
cache = RedisCache()
|