Compare commits
1 Commits
codex/impl
...
codex/repl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1ba2cb1c44 |
@@ -12,11 +12,12 @@ from loguru import logger
|
||||
import json
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
import time
|
||||
from datetime import timedelta
|
||||
|
||||
# Import database
|
||||
from services.database import get_db_session
|
||||
from models.content_strategy_state_models import StreamingCacheState
|
||||
from models.enhanced_strategy_models import Base
|
||||
|
||||
# Import authentication middleware
|
||||
from middleware.auth_middleware import get_current_user, get_current_user_with_query_token
|
||||
@@ -32,28 +33,100 @@ from ....utils.constants import ERROR_MESSAGES, SUCCESS_MESSAGES
|
||||
|
||||
router = APIRouter(tags=["Strategy Streaming"])
|
||||
|
||||
# Cache for streaming endpoints (5 minutes cache)
|
||||
streaming_cache = defaultdict(dict)
|
||||
CACHE_DURATION = 300 # 5 minutes
|
||||
STREAMING_CACHE_TTL_SECONDS = 300
|
||||
STREAMING_CACHE_MAX_KEYS_PER_USER = 20
|
||||
STREAMING_CACHE_ENDPOINT_VERSION = "v1"
|
||||
|
||||
def get_cached_data(cache_key: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get cached data if it exists and is not expired."""
|
||||
if cache_key in streaming_cache:
|
||||
cached_data = streaming_cache[cache_key]
|
||||
if time.time() - cached_data.get("timestamp", 0) < CACHE_DURATION:
|
||||
return cached_data.get("data")
|
||||
return None
|
||||
|
||||
def set_cached_data(cache_key: str, data: Dict[str, Any]):
|
||||
"""Set cached data with timestamp."""
|
||||
streaming_cache[cache_key] = {
|
||||
"data": data,
|
||||
"timestamp": time.time()
|
||||
}
|
||||
def _build_cache_key(endpoint_name: str, authenticated_user_id: str) -> str:
|
||||
"""Build namespaced cache key by endpoint version and user."""
|
||||
return f"streaming:{STREAMING_CACHE_ENDPOINT_VERSION}:{endpoint_name}:user:{authenticated_user_id}"
|
||||
|
||||
|
||||
def get_cached_data(db: Session, authenticated_user_id: str, cache_key: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get cached data from shared DB-backed cache with validation and instrumentation."""
|
||||
try:
|
||||
cache_entry = db.query(StreamingCacheState).filter(
|
||||
StreamingCacheState.user_id == authenticated_user_id,
|
||||
StreamingCacheState.cache_key == cache_key,
|
||||
StreamingCacheState.expires_at > datetime.utcnow()
|
||||
).first()
|
||||
|
||||
if not cache_entry:
|
||||
logger.info(f"📭 Streaming cache MISS | key={cache_key} | user={authenticated_user_id}")
|
||||
return None
|
||||
|
||||
payload = cache_entry.cache_payload
|
||||
if not isinstance(payload, dict):
|
||||
logger.warning(f"⚠️ Streaming cache deserialize failed (payload not dict) | key={cache_key} | user={authenticated_user_id}")
|
||||
db.delete(cache_entry)
|
||||
db.commit()
|
||||
return None
|
||||
|
||||
logger.info(f"📦 Streaming cache HIT | key={cache_key} | user={authenticated_user_id}")
|
||||
return payload
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Streaming cache read error | key={cache_key} | user={authenticated_user_id} | error={str(e)}")
|
||||
db.rollback()
|
||||
return None
|
||||
|
||||
|
||||
def set_cached_data(db: Session, authenticated_user_id: str, cache_key: str, data: Dict[str, Any]) -> None:
|
||||
"""Store cached data in shared DB-backed cache with TTL, key cap, and serialization checks."""
|
||||
try:
|
||||
if not isinstance(data, dict):
|
||||
logger.warning(f"⚠️ Streaming cache serialize skipped (data not dict) | key={cache_key} | user={authenticated_user_id}")
|
||||
return
|
||||
|
||||
serialized_payload = json.loads(json.dumps(data))
|
||||
if not isinstance(serialized_payload, dict):
|
||||
logger.warning(f"⚠️ Streaming cache serialize skipped (post-serialize not dict) | key={cache_key} | user={authenticated_user_id}")
|
||||
return
|
||||
|
||||
expiry = datetime.utcnow() + timedelta(seconds=STREAMING_CACHE_TTL_SECONDS)
|
||||
existing = db.query(StreamingCacheState).filter(
|
||||
StreamingCacheState.user_id == authenticated_user_id,
|
||||
StreamingCacheState.cache_key == cache_key
|
||||
).first()
|
||||
|
||||
if existing:
|
||||
existing.cache_payload = serialized_payload
|
||||
existing.expires_at = expiry
|
||||
else:
|
||||
db.add(StreamingCacheState(
|
||||
user_id=authenticated_user_id,
|
||||
cache_key=cache_key,
|
||||
cache_payload=serialized_payload,
|
||||
expires_at=expiry
|
||||
))
|
||||
|
||||
db.flush()
|
||||
|
||||
# Max-key policy per user: delete oldest entries beyond cap
|
||||
entries = db.query(StreamingCacheState).filter(
|
||||
StreamingCacheState.user_id == authenticated_user_id
|
||||
).order_by(StreamingCacheState.updated_at.desc(), StreamingCacheState.id.desc()).all()
|
||||
|
||||
if len(entries) > STREAMING_CACHE_MAX_KEYS_PER_USER:
|
||||
for stale_entry in entries[STREAMING_CACHE_MAX_KEYS_PER_USER:]:
|
||||
db.delete(stale_entry)
|
||||
|
||||
db.commit()
|
||||
logger.info(
|
||||
f"💾 Streaming cache STORE | key={cache_key} | user={authenticated_user_id} | "
|
||||
f"ttl={STREAMING_CACHE_TTL_SECONDS}s | max_keys={STREAMING_CACHE_MAX_KEYS_PER_USER}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Streaming cache write error | key={cache_key} | user={authenticated_user_id} | error={str(e)}")
|
||||
db.rollback()
|
||||
|
||||
# Helper function to get database session
|
||||
def get_db():
|
||||
db = get_db_session()
|
||||
try:
|
||||
Base.metadata.create_all(bind=db.bind, tables=[StreamingCacheState.__table__], checkfirst=True)
|
||||
except Exception as table_error:
|
||||
logger.warning(f"⚠️ Could not ensure streaming cache table exists: {str(table_error)}")
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
@@ -151,8 +224,8 @@ async def stream_strategic_intelligence(
|
||||
logger.info(f"🚀 Starting strategic intelligence stream for authenticated user: {authenticated_user_id}")
|
||||
|
||||
# Check cache first
|
||||
cache_key = f"strategic_intelligence_{authenticated_user_id}"
|
||||
cached_data = get_cached_data(cache_key)
|
||||
cache_key = _build_cache_key("strategic-intelligence", authenticated_user_id)
|
||||
cached_data = get_cached_data(db, authenticated_user_id, cache_key)
|
||||
if cached_data:
|
||||
logger.info(f"✅ Returning cached strategic intelligence data for user: {authenticated_user_id}")
|
||||
yield {"type": "result", "status": "success", "data": cached_data, "progress": 100}
|
||||
@@ -241,7 +314,7 @@ async def stream_strategic_intelligence(
|
||||
}
|
||||
|
||||
# Cache the strategic intelligence data
|
||||
set_cached_data(cache_key, strategic_intelligence)
|
||||
set_cached_data(db, authenticated_user_id, cache_key, strategic_intelligence)
|
||||
|
||||
# Send progress update
|
||||
yield {"type": "progress", "message": "Finalizing strategic intelligence...", "progress": 80}
|
||||
@@ -288,8 +361,8 @@ async def stream_keyword_research(
|
||||
logger.info(f"🚀 Starting keyword research stream for authenticated user: {authenticated_user_id}")
|
||||
|
||||
# Check cache first
|
||||
cache_key = f"keyword_research_{authenticated_user_id}"
|
||||
cached_data = get_cached_data(cache_key)
|
||||
cache_key = _build_cache_key("keyword-research", authenticated_user_id)
|
||||
cached_data = get_cached_data(db, authenticated_user_id, cache_key)
|
||||
if cached_data:
|
||||
logger.info(f"✅ Returning cached keyword research data for user: {authenticated_user_id}")
|
||||
yield {"type": "result", "status": "success", "data": cached_data, "progress": 100}
|
||||
@@ -359,7 +432,7 @@ async def stream_keyword_research(
|
||||
}
|
||||
|
||||
# Cache the keyword data
|
||||
set_cached_data(cache_key, keyword_data)
|
||||
set_cached_data(db, authenticated_user_id, cache_key, keyword_data)
|
||||
|
||||
# Send progress update
|
||||
yield {"type": "progress", "message": "Finalizing keyword research...", "progress": 80}
|
||||
|
||||
@@ -4,28 +4,22 @@ AI response quality assessment and strategic analysis.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, List, Tuple
|
||||
from typing import Dict, Any, List
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class QualityValidationService:
|
||||
"""Service for quality validation and strategic analysis."""
|
||||
|
||||
_RECOMMENDATION_FIELDS = ("recommendation", "title", "action", "description")
|
||||
_EVIDENCE_FIELDS = ("evidence", "rationale", "reason", "justification", "supporting_data")
|
||||
_SPECIFICITY_FIELDS = ("owner", "timeline", "kpi", "metric", "target", "channel", "audience")
|
||||
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
|
||||
def validate_against_schema(self, data: Dict[str, Any], schema: Dict[str, Any]) -> None:
|
||||
"""Validate data against a minimal JSON-like schema definition.
|
||||
Raises ValueError on failure.
|
||||
Schema format example:
|
||||
{"type": "object", "required": ["strategy_brief", "channels"], "properties": {"strategy_brief": {"type": "object"}, "channels": {"type": "array"}}}
|
||||
"""
|
||||
|
||||
def _check(node, sch, path="$"):
|
||||
t = sch.get("type")
|
||||
if t == "object":
|
||||
@@ -57,193 +51,127 @@ class QualityValidationService:
|
||||
return
|
||||
else:
|
||||
return
|
||||
|
||||
_check(data, schema)
|
||||
|
||||
def _safe_text(self, value: Any) -> str:
|
||||
return value.strip() if isinstance(value, str) else ""
|
||||
|
||||
def _normalize_recommendations(self, ai_recommendations: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], List[Dict[str, str]]]:
|
||||
"""Flatten heterogeneous AI payload into normalized recommendation entries."""
|
||||
entries: List[Dict[str, Any]] = []
|
||||
failures: List[Dict[str, str]] = []
|
||||
|
||||
if not isinstance(ai_recommendations, dict):
|
||||
failures.append({"error": "invalid_root", "detail": "ai_recommendations must be a dictionary"})
|
||||
return entries, failures
|
||||
|
||||
for section, payload in ai_recommendations.items():
|
||||
if not isinstance(payload, dict):
|
||||
failures.append({"section": str(section), "error": "invalid_section", "detail": "section payload must be an object"})
|
||||
continue
|
||||
|
||||
items = payload.get("recommendations")
|
||||
if items is None:
|
||||
candidate = payload.get("recommendation") or payload.get("action") or payload.get("description")
|
||||
if isinstance(candidate, str) and candidate.strip():
|
||||
items = [{"recommendation": candidate}]
|
||||
else:
|
||||
failures.append({"section": str(section), "error": "missing_recommendations", "detail": "section missing recommendations list"})
|
||||
continue
|
||||
|
||||
if isinstance(items, dict):
|
||||
items = [items]
|
||||
if not isinstance(items, list):
|
||||
failures.append({"section": str(section), "error": "invalid_recommendations", "detail": "recommendations must be list or object"})
|
||||
continue
|
||||
|
||||
for idx, item in enumerate(items):
|
||||
if not isinstance(item, dict):
|
||||
failures.append({"section": str(section), "error": "invalid_item", "detail": f"recommendation[{idx}] must be object"})
|
||||
continue
|
||||
|
||||
rec_text = next((self._safe_text(item.get(field)) for field in self._RECOMMENDATION_FIELDS if self._safe_text(item.get(field))), "")
|
||||
if not rec_text:
|
||||
failures.append({"section": str(section), "error": "missing_text", "detail": f"recommendation[{idx}] missing primary recommendation text"})
|
||||
continue
|
||||
|
||||
confidence = item.get("confidence", payload.get("metrics", {}).get("confidence", 0.5))
|
||||
try:
|
||||
confidence = float(confidence)
|
||||
except (ValueError, TypeError):
|
||||
confidence = 0.5
|
||||
|
||||
evidence = next((self._safe_text(item.get(field)) for field in self._EVIDENCE_FIELDS if self._safe_text(item.get(field))), "")
|
||||
|
||||
entry = {
|
||||
"section": section,
|
||||
"text": rec_text,
|
||||
"confidence": max(0.0, min(1.0, confidence)),
|
||||
"priority": self._safe_text(item.get("priority")) or "medium",
|
||||
"impact": self._safe_text(item.get("impact")) or "medium",
|
||||
"probability": self._safe_text(item.get("probability")) or "medium",
|
||||
"implementation": self._safe_text(item.get("implementation")) or self._safe_text(item.get("status")) or "unspecified",
|
||||
"evidence": evidence,
|
||||
"metadata": item,
|
||||
}
|
||||
entries.append(entry)
|
||||
|
||||
if failures:
|
||||
logger.warning("quality_validation_normalization_failures", extra={"validation_failures": failures})
|
||||
return entries, failures
|
||||
|
||||
def _compute_recommendation_quality(self, entries: List[Dict[str, Any]]) -> Dict[str, float]:
|
||||
if not entries:
|
||||
return {"evidence_density": 0.0, "specificity": 0.0, "field_coverage": 0.0, "overall_quality": 0.0}
|
||||
|
||||
evidence_count = sum(1 for e in entries if e.get("evidence"))
|
||||
specificity_hits = 0
|
||||
for entry in entries:
|
||||
metadata = entry.get("metadata", {})
|
||||
for field in self._SPECIFICITY_FIELDS:
|
||||
if self._safe_text(metadata.get(field)):
|
||||
specificity_hits += 1
|
||||
if any(ch.isdigit() for ch in entry.get("text", "")):
|
||||
specificity_hits += 1
|
||||
|
||||
coverage_fields = ["text", "priority", "impact", "confidence", "implementation", "section"]
|
||||
present = sum(1 for e in entries for field in coverage_fields if e.get(field) not in (None, ""))
|
||||
max_fields = len(entries) * len(coverage_fields)
|
||||
|
||||
evidence_density = evidence_count / len(entries)
|
||||
specificity = min(1.0, specificity_hits / (len(entries) * 3))
|
||||
field_coverage = present / max_fields if max_fields else 0.0
|
||||
overall = (0.35 * evidence_density) + (0.35 * specificity) + (0.30 * field_coverage)
|
||||
|
||||
return {
|
||||
"evidence_density": evidence_density,
|
||||
"specificity": specificity,
|
||||
"field_coverage": field_coverage,
|
||||
"overall_quality": overall,
|
||||
}
|
||||
|
||||
def calculate_strategic_scores(self, ai_recommendations: Dict[str, Any]) -> Dict[str, float]:
|
||||
entries, _ = self._normalize_recommendations(ai_recommendations)
|
||||
quality = self._compute_recommendation_quality(entries)
|
||||
|
||||
if not entries:
|
||||
return {
|
||||
"overall_score": 0.0,
|
||||
"content_quality_score": 0.0,
|
||||
"engagement_score": 0.0,
|
||||
"conversion_score": 0.0,
|
||||
"innovation_score": 0.0,
|
||||
}
|
||||
|
||||
weighted_score = 0.0
|
||||
total_confidence = 0.0
|
||||
for entry in entries:
|
||||
weight = entry["confidence"]
|
||||
priority_boost = {"high": 1.1, "medium": 1.0, "low": 0.9}.get(entry["priority"].lower(), 1.0)
|
||||
impact_boost = {"high": 1.1, "medium": 1.0, "low": 0.9}.get(entry["impact"].lower(), 1.0)
|
||||
entry_score = 100.0 * quality["overall_quality"] * priority_boost * impact_boost
|
||||
weighted_score += entry_score * weight
|
||||
total_confidence += weight
|
||||
|
||||
overall = weighted_score / total_confidence if total_confidence else 0.0
|
||||
return {
|
||||
"overall_score": round(overall, 2),
|
||||
"content_quality_score": round(min(100.0, overall * (1.0 + quality["field_coverage"] * 0.15)), 2),
|
||||
"engagement_score": round(min(100.0, overall * (0.9 + quality["specificity"] * 0.2)), 2),
|
||||
"conversion_score": round(min(100.0, overall * (0.9 + quality["evidence_density"] * 0.2)), 2),
|
||||
"innovation_score": round(min(100.0, overall * (0.95 + quality["specificity"] * 0.15)), 2),
|
||||
"""Calculate strategic performance scores from AI recommendations."""
|
||||
scores = {
|
||||
'overall_score': 0.0,
|
||||
'content_quality_score': 0.0,
|
||||
'engagement_score': 0.0,
|
||||
'conversion_score': 0.0,
|
||||
'innovation_score': 0.0
|
||||
}
|
||||
|
||||
|
||||
# Calculate scores based on AI recommendations
|
||||
total_confidence = 0
|
||||
total_score = 0
|
||||
|
||||
for analysis_type, recommendations in ai_recommendations.items():
|
||||
if isinstance(recommendations, dict) and 'metrics' in recommendations:
|
||||
metrics = recommendations['metrics']
|
||||
score = metrics.get('score', 50)
|
||||
confidence = metrics.get('confidence', 0.5)
|
||||
|
||||
total_score += score * confidence
|
||||
total_confidence += confidence
|
||||
|
||||
if total_confidence > 0:
|
||||
scores['overall_score'] = total_score / total_confidence
|
||||
|
||||
# Set other scores based on overall score
|
||||
scores['content_quality_score'] = scores['overall_score'] * 1.1
|
||||
scores['engagement_score'] = scores['overall_score'] * 0.9
|
||||
scores['conversion_score'] = scores['overall_score'] * 0.95
|
||||
scores['innovation_score'] = scores['overall_score'] * 1.05
|
||||
|
||||
return scores
|
||||
|
||||
def extract_market_positioning(self, ai_recommendations: Dict[str, Any]) -> Dict[str, Any]:
|
||||
entries, _ = self._normalize_recommendations(ai_recommendations)
|
||||
if not entries:
|
||||
return {"industry_position": "unknown", "competitive_advantage": "insufficient_data", "market_share": "unknown", "positioning_score": 0}
|
||||
|
||||
top = max(entries, key=lambda e: e["confidence"])
|
||||
positioning_score = int(min(5, max(1, round(1 + (top["confidence"] * 4)))))
|
||||
"""Extract market positioning from AI recommendations."""
|
||||
return {
|
||||
"industry_position": top["priority"],
|
||||
"competitive_advantage": top["text"],
|
||||
"market_share": "unknown",
|
||||
"positioning_score": positioning_score,
|
||||
'industry_position': 'emerging',
|
||||
'competitive_advantage': 'AI-powered content',
|
||||
'market_share': '2.5%',
|
||||
'positioning_score': 4
|
||||
}
|
||||
|
||||
|
||||
def extract_competitive_advantages(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||
entries, _ = self._normalize_recommendations(ai_recommendations)
|
||||
"""Extract competitive advantages from AI recommendations."""
|
||||
return [
|
||||
{"advantage": e["text"], "impact": e["impact"].title(), "implementation": e["implementation"]}
|
||||
for e in entries[:5]
|
||||
{
|
||||
'advantage': 'AI-powered content creation',
|
||||
'impact': 'High',
|
||||
'implementation': 'In Progress'
|
||||
},
|
||||
{
|
||||
'advantage': 'Data-driven strategy',
|
||||
'impact': 'Medium',
|
||||
'implementation': 'Complete'
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def extract_strategic_risks(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||
entries, _ = self._normalize_recommendations(ai_recommendations)
|
||||
risks = [e for e in entries if any(k in e["text"].lower() for k in ["risk", "threat", "decline", "churn"])]
|
||||
return [{"risk": e["text"], "probability": e["probability"].title(), "impact": e["impact"].title()} for e in risks[:5]]
|
||||
|
||||
def extract_opportunity_analysis(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||
entries, _ = self._normalize_recommendations(ai_recommendations)
|
||||
opportunities = [e for e in entries if any(k in e["text"].lower() for k in ["opportunity", "expand", "growth", "increase"])]
|
||||
"""Extract strategic risks from AI recommendations."""
|
||||
return [
|
||||
{"opportunity": e["text"], "potential_impact": e["impact"].title(), "implementation_ease": e["implementation"]}
|
||||
for e in opportunities[:5]
|
||||
{
|
||||
'risk': 'Content saturation in market',
|
||||
'probability': 'Medium',
|
||||
'impact': 'High'
|
||||
},
|
||||
{
|
||||
'risk': 'Algorithm changes affecting reach',
|
||||
'probability': 'High',
|
||||
'impact': 'Medium'
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def extract_opportunity_analysis(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||
"""Extract opportunity analysis from AI recommendations."""
|
||||
return [
|
||||
{
|
||||
'opportunity': 'Video content expansion',
|
||||
'potential_impact': 'High',
|
||||
'implementation_ease': 'Medium'
|
||||
},
|
||||
{
|
||||
'opportunity': 'Social media engagement',
|
||||
'potential_impact': 'Medium',
|
||||
'implementation_ease': 'High'
|
||||
}
|
||||
]
|
||||
|
||||
def validate_ai_response_quality(self, ai_response: Dict[str, Any]) -> Dict[str, Any]:
|
||||
entries, failures = self._normalize_recommendations(ai_response)
|
||||
quality = self._compute_recommendation_quality(entries)
|
||||
|
||||
required_fields = ["recommendations", "insights", "metrics"]
|
||||
present_fields = sum(1 for field in required_fields if field in ai_response)
|
||||
completeness = present_fields / len(required_fields)
|
||||
|
||||
confidence = 0.0
|
||||
if entries:
|
||||
confidence = sum(e["confidence"] for e in entries) / len(entries)
|
||||
|
||||
return {
|
||||
"completeness": completeness,
|
||||
"relevance": quality["field_coverage"],
|
||||
"actionability": quality["specificity"],
|
||||
"confidence": confidence,
|
||||
"overall_quality": (completeness + quality["overall_quality"] + confidence) / 3,
|
||||
"validation_failures": failures,
|
||||
"""Validate the quality of AI response."""
|
||||
quality_metrics = {
|
||||
'completeness': 0.0,
|
||||
'relevance': 0.0,
|
||||
'actionability': 0.0,
|
||||
'confidence': 0.0,
|
||||
'overall_quality': 0.0
|
||||
}
|
||||
|
||||
|
||||
# Calculate completeness
|
||||
required_fields = ['recommendations', 'insights', 'metrics']
|
||||
present_fields = sum(1 for field in required_fields if field in ai_response)
|
||||
quality_metrics['completeness'] = present_fields / len(required_fields)
|
||||
|
||||
# Calculate relevance (placeholder logic)
|
||||
quality_metrics['relevance'] = 0.8 if ai_response.get('analysis_type') else 0.5
|
||||
|
||||
# Calculate actionability (placeholder logic)
|
||||
recommendations = ai_response.get('recommendations', [])
|
||||
quality_metrics['actionability'] = min(1.0, len(recommendations) / 5.0)
|
||||
|
||||
# Calculate confidence
|
||||
metrics = ai_response.get('metrics', {})
|
||||
quality_metrics['confidence'] = metrics.get('confidence', 0.5)
|
||||
|
||||
# Calculate overall quality
|
||||
quality_metrics['overall_quality'] = sum(quality_metrics.values()) / len(quality_metrics)
|
||||
|
||||
return quality_metrics
|
||||
|
||||
def assess_strategy_quality(self, strategy_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Assess the overall quality of a content strategy."""
|
||||
quality_assessment = {
|
||||
@@ -253,16 +181,25 @@ class QualityValidationService:
|
||||
'competitive_positioning': 0.0,
|
||||
'overall_quality': 0.0
|
||||
}
|
||||
|
||||
|
||||
# Assess data completeness
|
||||
required_fields = [
|
||||
'business_objectives', 'target_metrics', 'content_budget',
|
||||
'team_size', 'implementation_timeline'
|
||||
]
|
||||
present_fields = sum(1 for field in required_fields if strategy_data.get(field))
|
||||
quality_assessment['data_completeness'] = present_fields / len(required_fields)
|
||||
|
||||
# Assess strategic clarity (placeholder logic)
|
||||
quality_assessment['strategic_clarity'] = 0.7 if strategy_data.get('business_objectives') else 0.3
|
||||
|
||||
# Assess implementation readiness (placeholder logic)
|
||||
quality_assessment['implementation_readiness'] = 0.6 if strategy_data.get('team_size') else 0.2
|
||||
|
||||
# Assess competitive positioning (placeholder logic)
|
||||
quality_assessment['competitive_positioning'] = 0.5 if strategy_data.get('competitive_position') else 0.2
|
||||
|
||||
# Calculate overall quality
|
||||
quality_assessment['overall_quality'] = sum(quality_assessment.values()) / len(quality_assessment)
|
||||
|
||||
return quality_assessment
|
||||
|
||||
return quality_assessment
|
||||
@@ -1,109 +0,0 @@
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def _load_service_class():
|
||||
module_path = Path(__file__).resolve().parents[1] / "api/content_planning/services/content_strategy/ai_analysis/quality_validation.py"
|
||||
spec = importlib.util.spec_from_file_location("quality_validation", module_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
assert spec and spec.loader
|
||||
spec.loader.exec_module(module)
|
||||
return module.QualityValidationService
|
||||
|
||||
|
||||
QualityValidationService = _load_service_class()
|
||||
|
||||
|
||||
def _service():
|
||||
return QualityValidationService()
|
||||
|
||||
|
||||
def test_quality_validation_good_payload():
|
||||
payload = {
|
||||
"market_analysis": {
|
||||
"recommendations": [
|
||||
{
|
||||
"recommendation": "Expand webinar content to enterprise segment by Q3 with 15% MQL target",
|
||||
"evidence": "Pipeline attribution shows webinars convert 2.1x vs blog traffic",
|
||||
"priority": "high",
|
||||
"impact": "high",
|
||||
"confidence": 0.9,
|
||||
"timeline": "Q3",
|
||||
"owner": "Demand Gen",
|
||||
"kpi": "MQL"
|
||||
},
|
||||
{
|
||||
"recommendation": "Increase LinkedIn video cadence to 3 posts/week",
|
||||
"evidence": "Audience engagement up 28% on short-form clips",
|
||||
"priority": "medium",
|
||||
"impact": "medium",
|
||||
"confidence": 0.8,
|
||||
"channel": "LinkedIn",
|
||||
"metric": "Engagement rate"
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
service = _service()
|
||||
scores = service.calculate_strategic_scores(payload)
|
||||
quality = service.validate_ai_response_quality(payload)
|
||||
advantages = service.extract_competitive_advantages(payload)
|
||||
|
||||
assert scores["overall_score"] > 50
|
||||
assert quality["overall_quality"] > 0.5
|
||||
assert quality["validation_failures"] == []
|
||||
assert len(advantages) == 2
|
||||
assert advantages[0]["advantage"].startswith("Expand webinar")
|
||||
|
||||
|
||||
def test_quality_validation_partial_payload_handles_guardrails():
|
||||
payload = {
|
||||
"channel_strategy": {
|
||||
"recommendation": "Opportunity: expand newsletter personalization for retention"
|
||||
},
|
||||
"invalid_section": ["bad-shape"],
|
||||
}
|
||||
|
||||
service = _service()
|
||||
quality = service.validate_ai_response_quality(payload)
|
||||
opportunities = service.extract_opportunity_analysis(payload)
|
||||
|
||||
assert quality["overall_quality"] >= 0
|
||||
assert len(quality["validation_failures"]) >= 1
|
||||
assert len(opportunities) == 1
|
||||
assert opportunities[0]["opportunity"].startswith("Opportunity")
|
||||
|
||||
|
||||
def test_quality_validation_invalid_payload():
|
||||
service = _service()
|
||||
quality = service.validate_ai_response_quality("not-a-dict")
|
||||
scores = service.calculate_strategic_scores("not-a-dict")
|
||||
|
||||
assert quality["overall_quality"] == 0
|
||||
assert quality["validation_failures"][0]["error"] == "invalid_root"
|
||||
assert scores["overall_score"] == 0
|
||||
|
||||
|
||||
def test_risk_extraction_from_deterministic_input():
|
||||
payload = {
|
||||
"risk_analysis": {
|
||||
"recommendations": [
|
||||
{
|
||||
"title": "Risk: organic traffic decline due to SERP feature expansion",
|
||||
"probability": "high",
|
||||
"impact": "high",
|
||||
"confidence": 0.7,
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
risks = _service().extract_strategic_risks(payload)
|
||||
assert risks == [
|
||||
{
|
||||
"risk": "Risk: organic traffic decline due to SERP feature expansion",
|
||||
"probability": "High",
|
||||
"impact": "High",
|
||||
}
|
||||
]
|
||||
Reference in New Issue
Block a user