Compare commits
1 Commits
codex/repl
...
codex/refa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d8824c223 |
@@ -12,12 +12,11 @@ from loguru import logger
|
||||
import json
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from collections import defaultdict
|
||||
import time
|
||||
|
||||
# Import database
|
||||
from services.database import get_db_session
|
||||
from models.content_strategy_state_models import StreamingCacheState
|
||||
from models.enhanced_strategy_models import Base
|
||||
|
||||
# Import authentication middleware
|
||||
from middleware.auth_middleware import get_current_user, get_current_user_with_query_token
|
||||
@@ -33,100 +32,28 @@ from ....utils.constants import ERROR_MESSAGES, SUCCESS_MESSAGES
|
||||
|
||||
router = APIRouter(tags=["Strategy Streaming"])
|
||||
|
||||
STREAMING_CACHE_TTL_SECONDS = 300
|
||||
STREAMING_CACHE_MAX_KEYS_PER_USER = 20
|
||||
STREAMING_CACHE_ENDPOINT_VERSION = "v1"
|
||||
# Cache for streaming endpoints (5 minutes cache)
|
||||
streaming_cache = defaultdict(dict)
|
||||
CACHE_DURATION = 300 # 5 minutes
|
||||
|
||||
def get_cached_data(cache_key: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get cached data if it exists and is not expired."""
|
||||
if cache_key in streaming_cache:
|
||||
cached_data = streaming_cache[cache_key]
|
||||
if time.time() - cached_data.get("timestamp", 0) < CACHE_DURATION:
|
||||
return cached_data.get("data")
|
||||
return None
|
||||
|
||||
def _build_cache_key(endpoint_name: str, authenticated_user_id: str) -> str:
|
||||
"""Build namespaced cache key by endpoint version and user."""
|
||||
return f"streaming:{STREAMING_CACHE_ENDPOINT_VERSION}:{endpoint_name}:user:{authenticated_user_id}"
|
||||
|
||||
|
||||
def get_cached_data(db: Session, authenticated_user_id: str, cache_key: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get cached data from shared DB-backed cache with validation and instrumentation."""
|
||||
try:
|
||||
cache_entry = db.query(StreamingCacheState).filter(
|
||||
StreamingCacheState.user_id == authenticated_user_id,
|
||||
StreamingCacheState.cache_key == cache_key,
|
||||
StreamingCacheState.expires_at > datetime.utcnow()
|
||||
).first()
|
||||
|
||||
if not cache_entry:
|
||||
logger.info(f"📭 Streaming cache MISS | key={cache_key} | user={authenticated_user_id}")
|
||||
return None
|
||||
|
||||
payload = cache_entry.cache_payload
|
||||
if not isinstance(payload, dict):
|
||||
logger.warning(f"⚠️ Streaming cache deserialize failed (payload not dict) | key={cache_key} | user={authenticated_user_id}")
|
||||
db.delete(cache_entry)
|
||||
db.commit()
|
||||
return None
|
||||
|
||||
logger.info(f"📦 Streaming cache HIT | key={cache_key} | user={authenticated_user_id}")
|
||||
return payload
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Streaming cache read error | key={cache_key} | user={authenticated_user_id} | error={str(e)}")
|
||||
db.rollback()
|
||||
return None
|
||||
|
||||
|
||||
def set_cached_data(db: Session, authenticated_user_id: str, cache_key: str, data: Dict[str, Any]) -> None:
|
||||
"""Store cached data in shared DB-backed cache with TTL, key cap, and serialization checks."""
|
||||
try:
|
||||
if not isinstance(data, dict):
|
||||
logger.warning(f"⚠️ Streaming cache serialize skipped (data not dict) | key={cache_key} | user={authenticated_user_id}")
|
||||
return
|
||||
|
||||
serialized_payload = json.loads(json.dumps(data))
|
||||
if not isinstance(serialized_payload, dict):
|
||||
logger.warning(f"⚠️ Streaming cache serialize skipped (post-serialize not dict) | key={cache_key} | user={authenticated_user_id}")
|
||||
return
|
||||
|
||||
expiry = datetime.utcnow() + timedelta(seconds=STREAMING_CACHE_TTL_SECONDS)
|
||||
existing = db.query(StreamingCacheState).filter(
|
||||
StreamingCacheState.user_id == authenticated_user_id,
|
||||
StreamingCacheState.cache_key == cache_key
|
||||
).first()
|
||||
|
||||
if existing:
|
||||
existing.cache_payload = serialized_payload
|
||||
existing.expires_at = expiry
|
||||
else:
|
||||
db.add(StreamingCacheState(
|
||||
user_id=authenticated_user_id,
|
||||
cache_key=cache_key,
|
||||
cache_payload=serialized_payload,
|
||||
expires_at=expiry
|
||||
))
|
||||
|
||||
db.flush()
|
||||
|
||||
# Max-key policy per user: delete oldest entries beyond cap
|
||||
entries = db.query(StreamingCacheState).filter(
|
||||
StreamingCacheState.user_id == authenticated_user_id
|
||||
).order_by(StreamingCacheState.updated_at.desc(), StreamingCacheState.id.desc()).all()
|
||||
|
||||
if len(entries) > STREAMING_CACHE_MAX_KEYS_PER_USER:
|
||||
for stale_entry in entries[STREAMING_CACHE_MAX_KEYS_PER_USER:]:
|
||||
db.delete(stale_entry)
|
||||
|
||||
db.commit()
|
||||
logger.info(
|
||||
f"💾 Streaming cache STORE | key={cache_key} | user={authenticated_user_id} | "
|
||||
f"ttl={STREAMING_CACHE_TTL_SECONDS}s | max_keys={STREAMING_CACHE_MAX_KEYS_PER_USER}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Streaming cache write error | key={cache_key} | user={authenticated_user_id} | error={str(e)}")
|
||||
db.rollback()
|
||||
def set_cached_data(cache_key: str, data: Dict[str, Any]):
|
||||
"""Set cached data with timestamp."""
|
||||
streaming_cache[cache_key] = {
|
||||
"data": data,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
|
||||
# Helper function to get database session
|
||||
def get_db():
|
||||
db = get_db_session()
|
||||
try:
|
||||
Base.metadata.create_all(bind=db.bind, tables=[StreamingCacheState.__table__], checkfirst=True)
|
||||
except Exception as table_error:
|
||||
logger.warning(f"⚠️ Could not ensure streaming cache table exists: {str(table_error)}")
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
@@ -224,8 +151,8 @@ async def stream_strategic_intelligence(
|
||||
logger.info(f"🚀 Starting strategic intelligence stream for authenticated user: {authenticated_user_id}")
|
||||
|
||||
# Check cache first
|
||||
cache_key = _build_cache_key("strategic-intelligence", authenticated_user_id)
|
||||
cached_data = get_cached_data(db, authenticated_user_id, cache_key)
|
||||
cache_key = f"strategic_intelligence_{authenticated_user_id}"
|
||||
cached_data = get_cached_data(cache_key)
|
||||
if cached_data:
|
||||
logger.info(f"✅ Returning cached strategic intelligence data for user: {authenticated_user_id}")
|
||||
yield {"type": "result", "status": "success", "data": cached_data, "progress": 100}
|
||||
@@ -267,60 +194,86 @@ async def stream_strategic_intelligence(
|
||||
# Send progress update
|
||||
yield {"type": "progress", "message": "Processing intelligence data...", "progress": 60}
|
||||
|
||||
def _ensure_list(value: Any) -> list:
|
||||
if isinstance(value, list):
|
||||
return [item for item in value if item is not None]
|
||||
return []
|
||||
|
||||
competitors = _ensure_list(strategy.get("top_competitors"))[:3]
|
||||
market_gaps = _ensure_list(strategy.get("market_gaps"))
|
||||
|
||||
raw_insights = ai_recommendations.get("strategic_insights")
|
||||
if isinstance(raw_insights, dict):
|
||||
ai_insights = [raw_insights]
|
||||
elif isinstance(raw_insights, list):
|
||||
ai_insights = [item for item in raw_insights if item is not None]
|
||||
else:
|
||||
ai_insights = []
|
||||
|
||||
opportunity_candidates = [
|
||||
ai_recommendations.get("opportunity_analysis"),
|
||||
ai_recommendations.get("opportunities"),
|
||||
ai_recommendations.get("strategic_opportunities"),
|
||||
]
|
||||
opportunities = []
|
||||
for candidate in opportunity_candidates:
|
||||
if isinstance(candidate, list) and candidate:
|
||||
opportunities = [item for item in candidate if item is not None]
|
||||
break
|
||||
|
||||
persisted_current_position = strategy.get("competitive_position")
|
||||
ai_positioning = ai_recommendations.get("market_positioning") if isinstance(ai_recommendations.get("market_positioning"), dict) else {}
|
||||
current_position = persisted_current_position or ai_positioning.get("current_position")
|
||||
target_position = ai_positioning.get("target_position")
|
||||
differentiation_factors = _ensure_list(ai_positioning.get("differentiation_factors"))
|
||||
|
||||
has_required_signals = bool(current_position and competitors and market_gaps and ai_insights and opportunities)
|
||||
status = "success" if has_required_signals else "partial_incomplete"
|
||||
|
||||
source_flags = {
|
||||
"current_position": "user_or_database" if persisted_current_position else ("model" if ai_positioning.get("current_position") else "insufficient_data"),
|
||||
"target_position": "model" if target_position else "insufficient_data",
|
||||
"differentiation_factors": "model" if differentiation_factors else "insufficient_data",
|
||||
"top_competitors": "user_or_database" if competitors else "insufficient_data",
|
||||
"market_gaps": "user_or_database" if market_gaps else "insufficient_data",
|
||||
"ai_insights": "model" if ai_insights else "insufficient_data",
|
||||
"opportunities": "model" if opportunities else "insufficient_data"
|
||||
}
|
||||
|
||||
missing_signals = [key for key, value in {
|
||||
"market_positioning.current_position": bool(current_position),
|
||||
"competitive_analysis.top_competitors": bool(competitors),
|
||||
"competitive_analysis.market_gaps": bool(market_gaps),
|
||||
"ai_insights": bool(ai_insights),
|
||||
"opportunities": bool(opportunities),
|
||||
}.items() if not value]
|
||||
|
||||
strategic_intelligence = {
|
||||
"status": status,
|
||||
"is_model_derived": any(source == "model" for source in source_flags.values()),
|
||||
"data_source_flags": source_flags,
|
||||
"missing_signals": missing_signals,
|
||||
"market_positioning": {
|
||||
"current_position": strategy.get("competitive_position", "Challenger"),
|
||||
"target_position": "Market Leader",
|
||||
"differentiation_factors": [
|
||||
"AI-powered content optimization",
|
||||
"Data-driven strategy development",
|
||||
"Personalized user experience"
|
||||
]
|
||||
"current_position": current_position,
|
||||
"target_position": target_position,
|
||||
"differentiation_factors": differentiation_factors,
|
||||
},
|
||||
"competitive_analysis": {
|
||||
"top_competitors": strategy.get("top_competitors", [])[:3] or [
|
||||
"Competitor A", "Competitor B", "Competitor C"
|
||||
],
|
||||
"competitive_advantages": [
|
||||
"Advanced AI capabilities",
|
||||
"Comprehensive data integration",
|
||||
"User-centric design"
|
||||
],
|
||||
"market_gaps": strategy.get("market_gaps", []) or [
|
||||
"AI-driven content personalization",
|
||||
"Real-time performance optimization",
|
||||
"Predictive analytics"
|
||||
]
|
||||
"top_competitors": competitors,
|
||||
"market_gaps": market_gaps,
|
||||
},
|
||||
"ai_insights": ai_recommendations.get("strategic_insights", []) or [
|
||||
"Focus on pillar content strategy",
|
||||
"Implement topic clustering",
|
||||
"Optimize for voice search"
|
||||
],
|
||||
"opportunities": [
|
||||
{
|
||||
"area": "Content Personalization",
|
||||
"potential_impact": "High",
|
||||
"implementation_timeline": "3-6 months",
|
||||
"estimated_roi": "25-40%"
|
||||
},
|
||||
{
|
||||
"area": "AI-Powered Optimization",
|
||||
"potential_impact": "Medium",
|
||||
"implementation_timeline": "6-12 months",
|
||||
"estimated_roi": "15-30%"
|
||||
}
|
||||
]
|
||||
"ai_insights": ai_insights,
|
||||
"opportunities": opportunities,
|
||||
}
|
||||
|
||||
# Cache the strategic intelligence data
|
||||
set_cached_data(db, authenticated_user_id, cache_key, strategic_intelligence)
|
||||
set_cached_data(cache_key, strategic_intelligence)
|
||||
|
||||
# Send progress update
|
||||
yield {"type": "progress", "message": "Finalizing strategic intelligence...", "progress": 80}
|
||||
|
||||
# Send final result
|
||||
yield {"type": "result", "status": "success", "data": strategic_intelligence, "progress": 100}
|
||||
yield {"type": "result", "status": status, "data": strategic_intelligence, "progress": 100}
|
||||
|
||||
logger.info(f"✅ Strategic intelligence stream completed for user: {authenticated_user_id}")
|
||||
|
||||
@@ -361,8 +314,8 @@ async def stream_keyword_research(
|
||||
logger.info(f"🚀 Starting keyword research stream for authenticated user: {authenticated_user_id}")
|
||||
|
||||
# Check cache first
|
||||
cache_key = _build_cache_key("keyword-research", authenticated_user_id)
|
||||
cached_data = get_cached_data(db, authenticated_user_id, cache_key)
|
||||
cache_key = f"keyword_research_{authenticated_user_id}"
|
||||
cached_data = get_cached_data(cache_key)
|
||||
if cached_data:
|
||||
logger.info(f"✅ Returning cached keyword research data for user: {authenticated_user_id}")
|
||||
yield {"type": "result", "status": "success", "data": cached_data, "progress": 100}
|
||||
@@ -432,7 +385,7 @@ async def stream_keyword_research(
|
||||
}
|
||||
|
||||
# Cache the keyword data
|
||||
set_cached_data(db, authenticated_user_id, cache_key, keyword_data)
|
||||
set_cached_data(cache_key, keyword_data)
|
||||
|
||||
# Send progress update
|
||||
yield {"type": "progress", "message": "Finalizing keyword research...", "progress": 80}
|
||||
|
||||
Reference in New Issue
Block a user