diff --git a/backend/api/content_planning/api/content_strategy/endpoints/streaming_endpoints.py b/backend/api/content_planning/api/content_strategy/endpoints/streaming_endpoints.py index ffd2606b..a5117634 100644 --- a/backend/api/content_planning/api/content_strategy/endpoints/streaming_endpoints.py +++ b/backend/api/content_planning/api/content_strategy/endpoints/streaming_endpoints.py @@ -12,11 +12,12 @@ from loguru import logger import json import asyncio from datetime import datetime -from collections import defaultdict -import time +from datetime import timedelta # Import database from services.database import get_db_session +from models.content_strategy_state_models import StreamingCacheState +from models.enhanced_strategy_models import Base # Import authentication middleware from middleware.auth_middleware import get_current_user, get_current_user_with_query_token @@ -32,28 +33,100 @@ from ....utils.constants import ERROR_MESSAGES, SUCCESS_MESSAGES router = APIRouter(tags=["Strategy Streaming"]) -# Cache for streaming endpoints (5 minutes cache) -streaming_cache = defaultdict(dict) -CACHE_DURATION = 300 # 5 minutes +STREAMING_CACHE_TTL_SECONDS = 300 +STREAMING_CACHE_MAX_KEYS_PER_USER = 20 +STREAMING_CACHE_ENDPOINT_VERSION = "v1" -def get_cached_data(cache_key: str) -> Optional[Dict[str, Any]]: - """Get cached data if it exists and is not expired.""" - if cache_key in streaming_cache: - cached_data = streaming_cache[cache_key] - if time.time() - cached_data.get("timestamp", 0) < CACHE_DURATION: - return cached_data.get("data") - return None -def set_cached_data(cache_key: str, data: Dict[str, Any]): - """Set cached data with timestamp.""" - streaming_cache[cache_key] = { - "data": data, - "timestamp": time.time() - } +def _build_cache_key(endpoint_name: str, authenticated_user_id: str) -> str: + """Build namespaced cache key by endpoint version and user.""" + return f"streaming:{STREAMING_CACHE_ENDPOINT_VERSION}:{endpoint_name}:user:{authenticated_user_id}" + + +def get_cached_data(db: Session, authenticated_user_id: str, cache_key: str) -> Optional[Dict[str, Any]]: + """Get cached data from shared DB-backed cache with validation and instrumentation.""" + try: + cache_entry = db.query(StreamingCacheState).filter( + StreamingCacheState.user_id == authenticated_user_id, + StreamingCacheState.cache_key == cache_key, + StreamingCacheState.expires_at > datetime.utcnow() + ).first() + + if not cache_entry: + logger.info(f"📭 Streaming cache MISS | key={cache_key} | user={authenticated_user_id}") + return None + + payload = cache_entry.cache_payload + if not isinstance(payload, dict): + logger.warning(f"⚠️ Streaming cache deserialize failed (payload not dict) | key={cache_key} | user={authenticated_user_id}") + db.delete(cache_entry) + db.commit() + return None + + logger.info(f"📦 Streaming cache HIT | key={cache_key} | user={authenticated_user_id}") + return payload + except Exception as e: + logger.error(f"❌ Streaming cache read error | key={cache_key} | user={authenticated_user_id} | error={str(e)}") + db.rollback() + return None + + +def set_cached_data(db: Session, authenticated_user_id: str, cache_key: str, data: Dict[str, Any]) -> None: + """Store cached data in shared DB-backed cache with TTL, key cap, and serialization checks.""" + try: + if not isinstance(data, dict): + logger.warning(f"⚠️ Streaming cache serialize skipped (data not dict) | key={cache_key} | user={authenticated_user_id}") + return + + serialized_payload = json.loads(json.dumps(data)) + if not isinstance(serialized_payload, dict): + logger.warning(f"⚠️ Streaming cache serialize skipped (post-serialize not dict) | key={cache_key} | user={authenticated_user_id}") + return + + expiry = datetime.utcnow() + timedelta(seconds=STREAMING_CACHE_TTL_SECONDS) + existing = db.query(StreamingCacheState).filter( + StreamingCacheState.user_id == authenticated_user_id, + StreamingCacheState.cache_key == cache_key + ).first() + + if existing: + existing.cache_payload = serialized_payload + existing.expires_at = expiry + else: + db.add(StreamingCacheState( + user_id=authenticated_user_id, + cache_key=cache_key, + cache_payload=serialized_payload, + expires_at=expiry + )) + + db.flush() + + # Max-key policy per user: delete oldest entries beyond cap + entries = db.query(StreamingCacheState).filter( + StreamingCacheState.user_id == authenticated_user_id + ).order_by(StreamingCacheState.updated_at.desc(), StreamingCacheState.id.desc()).all() + + if len(entries) > STREAMING_CACHE_MAX_KEYS_PER_USER: + for stale_entry in entries[STREAMING_CACHE_MAX_KEYS_PER_USER:]: + db.delete(stale_entry) + + db.commit() + logger.info( + f"💾 Streaming cache STORE | key={cache_key} | user={authenticated_user_id} | " + f"ttl={STREAMING_CACHE_TTL_SECONDS}s | max_keys={STREAMING_CACHE_MAX_KEYS_PER_USER}" + ) + except Exception as e: + logger.error(f"❌ Streaming cache write error | key={cache_key} | user={authenticated_user_id} | error={str(e)}") + db.rollback() # Helper function to get database session def get_db(): db = get_db_session() + try: + Base.metadata.create_all(bind=db.bind, tables=[StreamingCacheState.__table__], checkfirst=True) + except Exception as table_error: + logger.warning(f"⚠️ Could not ensure streaming cache table exists: {str(table_error)}") try: yield db finally: @@ -151,8 +224,8 @@ async def stream_strategic_intelligence( logger.info(f"🚀 Starting strategic intelligence stream for authenticated user: {authenticated_user_id}") # Check cache first - cache_key = f"strategic_intelligence_{authenticated_user_id}" - cached_data = get_cached_data(cache_key) + cache_key = _build_cache_key("strategic-intelligence", authenticated_user_id) + cached_data = get_cached_data(db, authenticated_user_id, cache_key) if cached_data: logger.info(f"✅ Returning cached strategic intelligence data for user: {authenticated_user_id}") yield {"type": "result", "status": "success", "data": cached_data, "progress": 100} @@ -241,7 +314,7 @@ async def stream_strategic_intelligence( } # Cache the strategic intelligence data - set_cached_data(cache_key, strategic_intelligence) + set_cached_data(db, authenticated_user_id, cache_key, strategic_intelligence) # Send progress update yield {"type": "progress", "message": "Finalizing strategic intelligence...", "progress": 80} @@ -288,8 +361,8 @@ async def stream_keyword_research( logger.info(f"🚀 Starting keyword research stream for authenticated user: {authenticated_user_id}") # Check cache first - cache_key = f"keyword_research_{authenticated_user_id}" - cached_data = get_cached_data(cache_key) + cache_key = _build_cache_key("keyword-research", authenticated_user_id) + cached_data = get_cached_data(db, authenticated_user_id, cache_key) if cached_data: logger.info(f"✅ Returning cached keyword research data for user: {authenticated_user_id}") yield {"type": "result", "status": "success", "data": cached_data, "progress": 100} @@ -359,7 +432,7 @@ async def stream_keyword_research( } # Cache the keyword data - set_cached_data(cache_key, keyword_data) + set_cached_data(db, authenticated_user_id, cache_key, keyword_data) # Send progress update yield {"type": "progress", "message": "Finalizing keyword research...", "progress": 80}