Compare commits
1 Commits
codex/upda
...
codex/repl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1ba2cb1c44 |
@@ -12,11 +12,12 @@ from loguru import logger
|
|||||||
import json
|
import json
|
||||||
import asyncio
|
import asyncio
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from collections import defaultdict
|
from datetime import timedelta
|
||||||
import time
|
|
||||||
|
|
||||||
# Import database
|
# Import database
|
||||||
from services.database import get_db_session
|
from services.database import get_db_session
|
||||||
|
from models.content_strategy_state_models import StreamingCacheState
|
||||||
|
from models.enhanced_strategy_models import Base
|
||||||
|
|
||||||
# Import authentication middleware
|
# Import authentication middleware
|
||||||
from middleware.auth_middleware import get_current_user, get_current_user_with_query_token
|
from middleware.auth_middleware import get_current_user, get_current_user_with_query_token
|
||||||
@@ -32,28 +33,100 @@ from ....utils.constants import ERROR_MESSAGES, SUCCESS_MESSAGES
|
|||||||
|
|
||||||
router = APIRouter(tags=["Strategy Streaming"])
|
router = APIRouter(tags=["Strategy Streaming"])
|
||||||
|
|
||||||
# Cache for streaming endpoints (5 minutes cache)
|
STREAMING_CACHE_TTL_SECONDS = 300
|
||||||
streaming_cache = defaultdict(dict)
|
STREAMING_CACHE_MAX_KEYS_PER_USER = 20
|
||||||
CACHE_DURATION = 300 # 5 minutes
|
STREAMING_CACHE_ENDPOINT_VERSION = "v1"
|
||||||
|
|
||||||
def get_cached_data(cache_key: str) -> Optional[Dict[str, Any]]:
|
|
||||||
"""Get cached data if it exists and is not expired."""
|
|
||||||
if cache_key in streaming_cache:
|
|
||||||
cached_data = streaming_cache[cache_key]
|
|
||||||
if time.time() - cached_data.get("timestamp", 0) < CACHE_DURATION:
|
|
||||||
return cached_data.get("data")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def set_cached_data(cache_key: str, data: Dict[str, Any]):
|
def _build_cache_key(endpoint_name: str, authenticated_user_id: str) -> str:
|
||||||
"""Set cached data with timestamp."""
|
"""Build namespaced cache key by endpoint version and user."""
|
||||||
streaming_cache[cache_key] = {
|
return f"streaming:{STREAMING_CACHE_ENDPOINT_VERSION}:{endpoint_name}:user:{authenticated_user_id}"
|
||||||
"data": data,
|
|
||||||
"timestamp": time.time()
|
|
||||||
}
|
def get_cached_data(db: Session, authenticated_user_id: str, cache_key: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Get cached data from shared DB-backed cache with validation and instrumentation."""
|
||||||
|
try:
|
||||||
|
cache_entry = db.query(StreamingCacheState).filter(
|
||||||
|
StreamingCacheState.user_id == authenticated_user_id,
|
||||||
|
StreamingCacheState.cache_key == cache_key,
|
||||||
|
StreamingCacheState.expires_at > datetime.utcnow()
|
||||||
|
).first()
|
||||||
|
|
||||||
|
if not cache_entry:
|
||||||
|
logger.info(f"📭 Streaming cache MISS | key={cache_key} | user={authenticated_user_id}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
payload = cache_entry.cache_payload
|
||||||
|
if not isinstance(payload, dict):
|
||||||
|
logger.warning(f"⚠️ Streaming cache deserialize failed (payload not dict) | key={cache_key} | user={authenticated_user_id}")
|
||||||
|
db.delete(cache_entry)
|
||||||
|
db.commit()
|
||||||
|
return None
|
||||||
|
|
||||||
|
logger.info(f"📦 Streaming cache HIT | key={cache_key} | user={authenticated_user_id}")
|
||||||
|
return payload
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"❌ Streaming cache read error | key={cache_key} | user={authenticated_user_id} | error={str(e)}")
|
||||||
|
db.rollback()
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def set_cached_data(db: Session, authenticated_user_id: str, cache_key: str, data: Dict[str, Any]) -> None:
|
||||||
|
"""Store cached data in shared DB-backed cache with TTL, key cap, and serialization checks."""
|
||||||
|
try:
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
logger.warning(f"⚠️ Streaming cache serialize skipped (data not dict) | key={cache_key} | user={authenticated_user_id}")
|
||||||
|
return
|
||||||
|
|
||||||
|
serialized_payload = json.loads(json.dumps(data))
|
||||||
|
if not isinstance(serialized_payload, dict):
|
||||||
|
logger.warning(f"⚠️ Streaming cache serialize skipped (post-serialize not dict) | key={cache_key} | user={authenticated_user_id}")
|
||||||
|
return
|
||||||
|
|
||||||
|
expiry = datetime.utcnow() + timedelta(seconds=STREAMING_CACHE_TTL_SECONDS)
|
||||||
|
existing = db.query(StreamingCacheState).filter(
|
||||||
|
StreamingCacheState.user_id == authenticated_user_id,
|
||||||
|
StreamingCacheState.cache_key == cache_key
|
||||||
|
).first()
|
||||||
|
|
||||||
|
if existing:
|
||||||
|
existing.cache_payload = serialized_payload
|
||||||
|
existing.expires_at = expiry
|
||||||
|
else:
|
||||||
|
db.add(StreamingCacheState(
|
||||||
|
user_id=authenticated_user_id,
|
||||||
|
cache_key=cache_key,
|
||||||
|
cache_payload=serialized_payload,
|
||||||
|
expires_at=expiry
|
||||||
|
))
|
||||||
|
|
||||||
|
db.flush()
|
||||||
|
|
||||||
|
# Max-key policy per user: delete oldest entries beyond cap
|
||||||
|
entries = db.query(StreamingCacheState).filter(
|
||||||
|
StreamingCacheState.user_id == authenticated_user_id
|
||||||
|
).order_by(StreamingCacheState.updated_at.desc(), StreamingCacheState.id.desc()).all()
|
||||||
|
|
||||||
|
if len(entries) > STREAMING_CACHE_MAX_KEYS_PER_USER:
|
||||||
|
for stale_entry in entries[STREAMING_CACHE_MAX_KEYS_PER_USER:]:
|
||||||
|
db.delete(stale_entry)
|
||||||
|
|
||||||
|
db.commit()
|
||||||
|
logger.info(
|
||||||
|
f"💾 Streaming cache STORE | key={cache_key} | user={authenticated_user_id} | "
|
||||||
|
f"ttl={STREAMING_CACHE_TTL_SECONDS}s | max_keys={STREAMING_CACHE_MAX_KEYS_PER_USER}"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"❌ Streaming cache write error | key={cache_key} | user={authenticated_user_id} | error={str(e)}")
|
||||||
|
db.rollback()
|
||||||
|
|
||||||
# Helper function to get database session
|
# Helper function to get database session
|
||||||
def get_db():
|
def get_db():
|
||||||
db = get_db_session()
|
db = get_db_session()
|
||||||
|
try:
|
||||||
|
Base.metadata.create_all(bind=db.bind, tables=[StreamingCacheState.__table__], checkfirst=True)
|
||||||
|
except Exception as table_error:
|
||||||
|
logger.warning(f"⚠️ Could not ensure streaming cache table exists: {str(table_error)}")
|
||||||
try:
|
try:
|
||||||
yield db
|
yield db
|
||||||
finally:
|
finally:
|
||||||
@@ -124,6 +197,10 @@ async def stream_enhanced_strategies(
|
|||||||
headers={
|
headers={
|
||||||
"Cache-Control": "no-cache",
|
"Cache-Control": "no-cache",
|
||||||
"Connection": "keep-alive",
|
"Connection": "keep-alive",
|
||||||
|
"Access-Control-Allow-Origin": "*",
|
||||||
|
"Access-Control-Allow-Headers": "*",
|
||||||
|
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
|
||||||
|
"Access-Control-Allow-Credentials": "true"
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -147,8 +224,8 @@ async def stream_strategic_intelligence(
|
|||||||
logger.info(f"🚀 Starting strategic intelligence stream for authenticated user: {authenticated_user_id}")
|
logger.info(f"🚀 Starting strategic intelligence stream for authenticated user: {authenticated_user_id}")
|
||||||
|
|
||||||
# Check cache first
|
# Check cache first
|
||||||
cache_key = f"strategic_intelligence_{authenticated_user_id}"
|
cache_key = _build_cache_key("strategic-intelligence", authenticated_user_id)
|
||||||
cached_data = get_cached_data(cache_key)
|
cached_data = get_cached_data(db, authenticated_user_id, cache_key)
|
||||||
if cached_data:
|
if cached_data:
|
||||||
logger.info(f"✅ Returning cached strategic intelligence data for user: {authenticated_user_id}")
|
logger.info(f"✅ Returning cached strategic intelligence data for user: {authenticated_user_id}")
|
||||||
yield {"type": "result", "status": "success", "data": cached_data, "progress": 100}
|
yield {"type": "result", "status": "success", "data": cached_data, "progress": 100}
|
||||||
@@ -237,7 +314,7 @@ async def stream_strategic_intelligence(
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Cache the strategic intelligence data
|
# Cache the strategic intelligence data
|
||||||
set_cached_data(cache_key, strategic_intelligence)
|
set_cached_data(db, authenticated_user_id, cache_key, strategic_intelligence)
|
||||||
|
|
||||||
# Send progress update
|
# Send progress update
|
||||||
yield {"type": "progress", "message": "Finalizing strategic intelligence...", "progress": 80}
|
yield {"type": "progress", "message": "Finalizing strategic intelligence...", "progress": 80}
|
||||||
@@ -257,6 +334,10 @@ async def stream_strategic_intelligence(
|
|||||||
headers={
|
headers={
|
||||||
"Cache-Control": "no-cache",
|
"Cache-Control": "no-cache",
|
||||||
"Connection": "keep-alive",
|
"Connection": "keep-alive",
|
||||||
|
"Access-Control-Allow-Origin": "*",
|
||||||
|
"Access-Control-Allow-Headers": "*",
|
||||||
|
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
|
||||||
|
"Access-Control-Allow-Credentials": "true"
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -280,8 +361,8 @@ async def stream_keyword_research(
|
|||||||
logger.info(f"🚀 Starting keyword research stream for authenticated user: {authenticated_user_id}")
|
logger.info(f"🚀 Starting keyword research stream for authenticated user: {authenticated_user_id}")
|
||||||
|
|
||||||
# Check cache first
|
# Check cache first
|
||||||
cache_key = f"keyword_research_{authenticated_user_id}"
|
cache_key = _build_cache_key("keyword-research", authenticated_user_id)
|
||||||
cached_data = get_cached_data(cache_key)
|
cached_data = get_cached_data(db, authenticated_user_id, cache_key)
|
||||||
if cached_data:
|
if cached_data:
|
||||||
logger.info(f"✅ Returning cached keyword research data for user: {authenticated_user_id}")
|
logger.info(f"✅ Returning cached keyword research data for user: {authenticated_user_id}")
|
||||||
yield {"type": "result", "status": "success", "data": cached_data, "progress": 100}
|
yield {"type": "result", "status": "success", "data": cached_data, "progress": 100}
|
||||||
@@ -351,7 +432,7 @@ async def stream_keyword_research(
|
|||||||
}
|
}
|
||||||
|
|
||||||
# Cache the keyword data
|
# Cache the keyword data
|
||||||
set_cached_data(cache_key, keyword_data)
|
set_cached_data(db, authenticated_user_id, cache_key, keyword_data)
|
||||||
|
|
||||||
# Send progress update
|
# Send progress update
|
||||||
yield {"type": "progress", "message": "Finalizing keyword research...", "progress": 80}
|
yield {"type": "progress", "message": "Finalizing keyword research...", "progress": 80}
|
||||||
|
|||||||
Reference in New Issue
Block a user