from __future__ import annotations import json from typing import Any, Optional from core.settings import REDIS_DEFAULT_TTL, REDIS_ENABLED, REDIS_PREFIX, REDIS_URL try: from redis import Redis except Exception: # pragma: no cover Redis = None # type: ignore class RedisCache: def __init__(self, *, prefix_override: Optional[str] = None, default_ttl_override: Optional[int] = None): self.prefix = str(prefix_override or REDIS_PREFIX).strip() or REDIS_PREFIX self.default_ttl = int(default_ttl_override if default_ttl_override is not None else REDIS_DEFAULT_TTL) self.enabled = False self.status = "disabled" self.status_detail = "" self._client: Optional["Redis"] = None if not REDIS_ENABLED: return if not REDIS_URL: self.status = "missing_url" return if Redis is None: self.status = "client_unavailable" self.status_detail = "redis python package is not installed" return try: self._client = Redis.from_url(REDIS_URL, decode_responses=True) self._client.ping() self.enabled = True self.status = "connected" except Exception as exc: self.enabled = False self._client = None self.status = "connection_failed" self.status_detail = str(exc or "").strip()[:200] def _full_key(self, key: str) -> str: return f"{self.prefix}:{key}" def ping(self) -> bool: if not self.enabled or self._client is None: return False try: return bool(self._client.ping()) except Exception: return False def get(self, key: str) -> Optional[str]: if not self.enabled or self._client is None: return None try: return self._client.get(self._full_key(key)) except Exception: return None def set(self, key: str, value: str, ttl: Optional[int] = None) -> None: if not self.enabled or self._client is None: return try: ttl_seconds = int(ttl if ttl is not None else self.default_ttl) self._client.setex(self._full_key(key), ttl_seconds, str(value)) except Exception: return def get_json(self, key: str) -> Any: if not self.enabled or self._client is None: return None try: raw = self.get(key) if not raw: return None return json.loads(raw) except Exception: return None def set_json(self, key: str, value: Any, ttl: Optional[int] = None) -> None: if not self.enabled or self._client is None: return try: self.set(key, json.dumps(value, ensure_ascii=False, default=str), ttl=ttl) except Exception: return def sadd(self, key: str, *members: str) -> None: if not self.enabled or self._client is None: return normalized = [str(member or "").strip() for member in members if str(member or "").strip()] if not normalized: return try: self._client.sadd(self._full_key(key), *normalized) except Exception: return def srem(self, key: str, *members: str) -> None: if not self.enabled or self._client is None: return normalized = [str(member or "").strip() for member in members if str(member or "").strip()] if not normalized: return try: self._client.srem(self._full_key(key), *normalized) except Exception: return def smembers(self, key: str) -> set[str]: if not self.enabled or self._client is None: return set() try: rows = self._client.smembers(self._full_key(key)) return {str(row or "").strip() for row in rows if str(row or "").strip()} except Exception: return set() def expire(self, key: str, ttl: int) -> None: if not self.enabled or self._client is None: return try: self._client.expire(self._full_key(key), max(1, int(ttl))) except Exception: return def delete(self, *keys: str) -> None: if not self.enabled or self._client is None: return full_keys = [self._full_key(key) for key in keys if str(key or "").strip()] if not full_keys: return try: self._client.delete(*full_keys) except Exception: return def delete_prefix(self, prefix: str) -> None: if not self.enabled or self._client is None: return pattern = self._full_key(f"{prefix}*") try: cursor = 0 while True: cursor, rows = self._client.scan(cursor=cursor, match=pattern, count=200) if rows: self._client.delete(*rows) if cursor == 0: break except Exception: return cache = RedisCache() auth_cache = RedisCache(prefix_override=f"{REDIS_PREFIX}_auth")