Compare commits

..

1 Commits

Author SHA1 Message Date
ي
390ac3488a Fix SSE CORS headers for credentialed streaming endpoints 2026-05-28 09:19:26 +05:30

View File

@@ -12,12 +12,11 @@ from loguru import logger
import json import json
import asyncio import asyncio
from datetime import datetime from datetime import datetime
from datetime import timedelta from collections import defaultdict
import time
# Import database # Import database
from services.database import get_db_session from services.database import get_db_session
from models.content_strategy_state_models import StreamingCacheState
from models.enhanced_strategy_models import Base
# Import authentication middleware # Import authentication middleware
from middleware.auth_middleware import get_current_user, get_current_user_with_query_token from middleware.auth_middleware import get_current_user, get_current_user_with_query_token
@@ -33,100 +32,28 @@ from ....utils.constants import ERROR_MESSAGES, SUCCESS_MESSAGES
router = APIRouter(tags=["Strategy Streaming"]) router = APIRouter(tags=["Strategy Streaming"])
STREAMING_CACHE_TTL_SECONDS = 300 # Cache for streaming endpoints (5 minutes cache)
STREAMING_CACHE_MAX_KEYS_PER_USER = 20 streaming_cache = defaultdict(dict)
STREAMING_CACHE_ENDPOINT_VERSION = "v1" CACHE_DURATION = 300 # 5 minutes
def get_cached_data(cache_key: str) -> Optional[Dict[str, Any]]:
"""Get cached data if it exists and is not expired."""
if cache_key in streaming_cache:
cached_data = streaming_cache[cache_key]
if time.time() - cached_data.get("timestamp", 0) < CACHE_DURATION:
return cached_data.get("data")
return None
def _build_cache_key(endpoint_name: str, authenticated_user_id: str) -> str: def set_cached_data(cache_key: str, data: Dict[str, Any]):
"""Build namespaced cache key by endpoint version and user.""" """Set cached data with timestamp."""
return f"streaming:{STREAMING_CACHE_ENDPOINT_VERSION}:{endpoint_name}:user:{authenticated_user_id}" streaming_cache[cache_key] = {
"data": data,
"timestamp": time.time()
def get_cached_data(db: Session, authenticated_user_id: str, cache_key: str) -> Optional[Dict[str, Any]]: }
"""Get cached data from shared DB-backed cache with validation and instrumentation."""
try:
cache_entry = db.query(StreamingCacheState).filter(
StreamingCacheState.user_id == authenticated_user_id,
StreamingCacheState.cache_key == cache_key,
StreamingCacheState.expires_at > datetime.utcnow()
).first()
if not cache_entry:
logger.info(f"📭 Streaming cache MISS | key={cache_key} | user={authenticated_user_id}")
return None
payload = cache_entry.cache_payload
if not isinstance(payload, dict):
logger.warning(f"⚠️ Streaming cache deserialize failed (payload not dict) | key={cache_key} | user={authenticated_user_id}")
db.delete(cache_entry)
db.commit()
return None
logger.info(f"📦 Streaming cache HIT | key={cache_key} | user={authenticated_user_id}")
return payload
except Exception as e:
logger.error(f"❌ Streaming cache read error | key={cache_key} | user={authenticated_user_id} | error={str(e)}")
db.rollback()
return None
def set_cached_data(db: Session, authenticated_user_id: str, cache_key: str, data: Dict[str, Any]) -> None:
"""Store cached data in shared DB-backed cache with TTL, key cap, and serialization checks."""
try:
if not isinstance(data, dict):
logger.warning(f"⚠️ Streaming cache serialize skipped (data not dict) | key={cache_key} | user={authenticated_user_id}")
return
serialized_payload = json.loads(json.dumps(data))
if not isinstance(serialized_payload, dict):
logger.warning(f"⚠️ Streaming cache serialize skipped (post-serialize not dict) | key={cache_key} | user={authenticated_user_id}")
return
expiry = datetime.utcnow() + timedelta(seconds=STREAMING_CACHE_TTL_SECONDS)
existing = db.query(StreamingCacheState).filter(
StreamingCacheState.user_id == authenticated_user_id,
StreamingCacheState.cache_key == cache_key
).first()
if existing:
existing.cache_payload = serialized_payload
existing.expires_at = expiry
else:
db.add(StreamingCacheState(
user_id=authenticated_user_id,
cache_key=cache_key,
cache_payload=serialized_payload,
expires_at=expiry
))
db.flush()
# Max-key policy per user: delete oldest entries beyond cap
entries = db.query(StreamingCacheState).filter(
StreamingCacheState.user_id == authenticated_user_id
).order_by(StreamingCacheState.updated_at.desc(), StreamingCacheState.id.desc()).all()
if len(entries) > STREAMING_CACHE_MAX_KEYS_PER_USER:
for stale_entry in entries[STREAMING_CACHE_MAX_KEYS_PER_USER:]:
db.delete(stale_entry)
db.commit()
logger.info(
f"💾 Streaming cache STORE | key={cache_key} | user={authenticated_user_id} | "
f"ttl={STREAMING_CACHE_TTL_SECONDS}s | max_keys={STREAMING_CACHE_MAX_KEYS_PER_USER}"
)
except Exception as e:
logger.error(f"❌ Streaming cache write error | key={cache_key} | user={authenticated_user_id} | error={str(e)}")
db.rollback()
# Helper function to get database session # Helper function to get database session
def get_db(): def get_db():
db = get_db_session() db = get_db_session()
try:
Base.metadata.create_all(bind=db.bind, tables=[StreamingCacheState.__table__], checkfirst=True)
except Exception as table_error:
logger.warning(f"⚠️ Could not ensure streaming cache table exists: {str(table_error)}")
try: try:
yield db yield db
finally: finally:
@@ -197,10 +124,6 @@ async def stream_enhanced_strategies(
headers={ headers={
"Cache-Control": "no-cache", "Cache-Control": "no-cache",
"Connection": "keep-alive", "Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Credentials": "true"
} }
) )
@@ -224,8 +147,8 @@ async def stream_strategic_intelligence(
logger.info(f"🚀 Starting strategic intelligence stream for authenticated user: {authenticated_user_id}") logger.info(f"🚀 Starting strategic intelligence stream for authenticated user: {authenticated_user_id}")
# Check cache first # Check cache first
cache_key = _build_cache_key("strategic-intelligence", authenticated_user_id) cache_key = f"strategic_intelligence_{authenticated_user_id}"
cached_data = get_cached_data(db, authenticated_user_id, cache_key) cached_data = get_cached_data(cache_key)
if cached_data: if cached_data:
logger.info(f"✅ Returning cached strategic intelligence data for user: {authenticated_user_id}") logger.info(f"✅ Returning cached strategic intelligence data for user: {authenticated_user_id}")
yield {"type": "result", "status": "success", "data": cached_data, "progress": 100} yield {"type": "result", "status": "success", "data": cached_data, "progress": 100}
@@ -314,7 +237,7 @@ async def stream_strategic_intelligence(
} }
# Cache the strategic intelligence data # Cache the strategic intelligence data
set_cached_data(db, authenticated_user_id, cache_key, strategic_intelligence) set_cached_data(cache_key, strategic_intelligence)
# Send progress update # Send progress update
yield {"type": "progress", "message": "Finalizing strategic intelligence...", "progress": 80} yield {"type": "progress", "message": "Finalizing strategic intelligence...", "progress": 80}
@@ -334,10 +257,6 @@ async def stream_strategic_intelligence(
headers={ headers={
"Cache-Control": "no-cache", "Cache-Control": "no-cache",
"Connection": "keep-alive", "Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Credentials": "true"
} }
) )
@@ -361,8 +280,8 @@ async def stream_keyword_research(
logger.info(f"🚀 Starting keyword research stream for authenticated user: {authenticated_user_id}") logger.info(f"🚀 Starting keyword research stream for authenticated user: {authenticated_user_id}")
# Check cache first # Check cache first
cache_key = _build_cache_key("keyword-research", authenticated_user_id) cache_key = f"keyword_research_{authenticated_user_id}"
cached_data = get_cached_data(db, authenticated_user_id, cache_key) cached_data = get_cached_data(cache_key)
if cached_data: if cached_data:
logger.info(f"✅ Returning cached keyword research data for user: {authenticated_user_id}") logger.info(f"✅ Returning cached keyword research data for user: {authenticated_user_id}")
yield {"type": "result", "status": "success", "data": cached_data, "progress": 100} yield {"type": "result", "status": "success", "data": cached_data, "progress": 100}
@@ -432,7 +351,7 @@ async def stream_keyword_research(
} }
# Cache the keyword data # Cache the keyword data
set_cached_data(db, authenticated_user_id, cache_key, keyword_data) set_cached_data(cache_key, keyword_data)
# Send progress update # Send progress update
yield {"type": "progress", "message": "Finalizing keyword research...", "progress": 80} yield {"type": "progress", "message": "Finalizing keyword research...", "progress": 80}