diff --git a/backend/api/content_planning/services/content_strategy/ai_analysis/quality_validation.py b/backend/api/content_planning/services/content_strategy/ai_analysis/quality_validation.py index 1d140c90..f639d14d 100644 --- a/backend/api/content_planning/services/content_strategy/ai_analysis/quality_validation.py +++ b/backend/api/content_planning/services/content_strategy/ai_analysis/quality_validation.py @@ -4,22 +4,28 @@ AI response quality assessment and strategic analysis. """ import logging -from typing import Dict, Any, List +from typing import Dict, Any, List, Tuple logger = logging.getLogger(__name__) + class QualityValidationService: """Service for quality validation and strategic analysis.""" - + + _RECOMMENDATION_FIELDS = ("recommendation", "title", "action", "description") + _EVIDENCE_FIELDS = ("evidence", "rationale", "reason", "justification", "supporting_data") + _SPECIFICITY_FIELDS = ("owner", "timeline", "kpi", "metric", "target", "channel", "audience") + def __init__(self): pass - + def validate_against_schema(self, data: Dict[str, Any], schema: Dict[str, Any]) -> None: """Validate data against a minimal JSON-like schema definition. Raises ValueError on failure. Schema format example: {"type": "object", "required": ["strategy_brief", "channels"], "properties": {"strategy_brief": {"type": "object"}, "channels": {"type": "array"}}} """ + def _check(node, sch, path="$"): t = sch.get("type") if t == "object": @@ -51,127 +57,193 @@ class QualityValidationService: return else: return + _check(data, schema) - def calculate_strategic_scores(self, ai_recommendations: Dict[str, Any]) -> Dict[str, float]: - """Calculate strategic performance scores from AI recommendations.""" - scores = { - 'overall_score': 0.0, - 'content_quality_score': 0.0, - 'engagement_score': 0.0, - 'conversion_score': 0.0, - 'innovation_score': 0.0 - } - - # Calculate scores based on AI recommendations - total_confidence = 0 - total_score = 0 - - for analysis_type, recommendations in ai_recommendations.items(): - if isinstance(recommendations, dict) and 'metrics' in recommendations: - metrics = recommendations['metrics'] - score = metrics.get('score', 50) - confidence = metrics.get('confidence', 0.5) - - total_score += score * confidence - total_confidence += confidence - - if total_confidence > 0: - scores['overall_score'] = total_score / total_confidence - - # Set other scores based on overall score - scores['content_quality_score'] = scores['overall_score'] * 1.1 - scores['engagement_score'] = scores['overall_score'] * 0.9 - scores['conversion_score'] = scores['overall_score'] * 0.95 - scores['innovation_score'] = scores['overall_score'] * 1.05 - - return scores - - def extract_market_positioning(self, ai_recommendations: Dict[str, Any]) -> Dict[str, Any]: - """Extract market positioning from AI recommendations.""" + def _safe_text(self, value: Any) -> str: + return value.strip() if isinstance(value, str) else "" + + def _normalize_recommendations(self, ai_recommendations: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], List[Dict[str, str]]]: + """Flatten heterogeneous AI payload into normalized recommendation entries.""" + entries: List[Dict[str, Any]] = [] + failures: List[Dict[str, str]] = [] + + if not isinstance(ai_recommendations, dict): + failures.append({"error": "invalid_root", "detail": "ai_recommendations must be a dictionary"}) + return entries, failures + + for section, payload in ai_recommendations.items(): + if not isinstance(payload, dict): + failures.append({"section": str(section), "error": "invalid_section", "detail": "section payload must be an object"}) + continue + + items = payload.get("recommendations") + if items is None: + candidate = payload.get("recommendation") or payload.get("action") or payload.get("description") + if isinstance(candidate, str) and candidate.strip(): + items = [{"recommendation": candidate}] + else: + failures.append({"section": str(section), "error": "missing_recommendations", "detail": "section missing recommendations list"}) + continue + + if isinstance(items, dict): + items = [items] + if not isinstance(items, list): + failures.append({"section": str(section), "error": "invalid_recommendations", "detail": "recommendations must be list or object"}) + continue + + for idx, item in enumerate(items): + if not isinstance(item, dict): + failures.append({"section": str(section), "error": "invalid_item", "detail": f"recommendation[{idx}] must be object"}) + continue + + rec_text = next((self._safe_text(item.get(field)) for field in self._RECOMMENDATION_FIELDS if self._safe_text(item.get(field))), "") + if not rec_text: + failures.append({"section": str(section), "error": "missing_text", "detail": f"recommendation[{idx}] missing primary recommendation text"}) + continue + + confidence = item.get("confidence", payload.get("metrics", {}).get("confidence", 0.5)) + try: + confidence = float(confidence) + except (ValueError, TypeError): + confidence = 0.5 + + evidence = next((self._safe_text(item.get(field)) for field in self._EVIDENCE_FIELDS if self._safe_text(item.get(field))), "") + + entry = { + "section": section, + "text": rec_text, + "confidence": max(0.0, min(1.0, confidence)), + "priority": self._safe_text(item.get("priority")) or "medium", + "impact": self._safe_text(item.get("impact")) or "medium", + "probability": self._safe_text(item.get("probability")) or "medium", + "implementation": self._safe_text(item.get("implementation")) or self._safe_text(item.get("status")) or "unspecified", + "evidence": evidence, + "metadata": item, + } + entries.append(entry) + + if failures: + logger.warning("quality_validation_normalization_failures", extra={"validation_failures": failures}) + return entries, failures + + def _compute_recommendation_quality(self, entries: List[Dict[str, Any]]) -> Dict[str, float]: + if not entries: + return {"evidence_density": 0.0, "specificity": 0.0, "field_coverage": 0.0, "overall_quality": 0.0} + + evidence_count = sum(1 for e in entries if e.get("evidence")) + specificity_hits = 0 + for entry in entries: + metadata = entry.get("metadata", {}) + for field in self._SPECIFICITY_FIELDS: + if self._safe_text(metadata.get(field)): + specificity_hits += 1 + if any(ch.isdigit() for ch in entry.get("text", "")): + specificity_hits += 1 + + coverage_fields = ["text", "priority", "impact", "confidence", "implementation", "section"] + present = sum(1 for e in entries for field in coverage_fields if e.get(field) not in (None, "")) + max_fields = len(entries) * len(coverage_fields) + + evidence_density = evidence_count / len(entries) + specificity = min(1.0, specificity_hits / (len(entries) * 3)) + field_coverage = present / max_fields if max_fields else 0.0 + overall = (0.35 * evidence_density) + (0.35 * specificity) + (0.30 * field_coverage) + return { - 'industry_position': 'emerging', - 'competitive_advantage': 'AI-powered content', - 'market_share': '2.5%', - 'positioning_score': 4 + "evidence_density": evidence_density, + "specificity": specificity, + "field_coverage": field_coverage, + "overall_quality": overall, } - + + def calculate_strategic_scores(self, ai_recommendations: Dict[str, Any]) -> Dict[str, float]: + entries, _ = self._normalize_recommendations(ai_recommendations) + quality = self._compute_recommendation_quality(entries) + + if not entries: + return { + "overall_score": 0.0, + "content_quality_score": 0.0, + "engagement_score": 0.0, + "conversion_score": 0.0, + "innovation_score": 0.0, + } + + weighted_score = 0.0 + total_confidence = 0.0 + for entry in entries: + weight = entry["confidence"] + priority_boost = {"high": 1.1, "medium": 1.0, "low": 0.9}.get(entry["priority"].lower(), 1.0) + impact_boost = {"high": 1.1, "medium": 1.0, "low": 0.9}.get(entry["impact"].lower(), 1.0) + entry_score = 100.0 * quality["overall_quality"] * priority_boost * impact_boost + weighted_score += entry_score * weight + total_confidence += weight + + overall = weighted_score / total_confidence if total_confidence else 0.0 + return { + "overall_score": round(overall, 2), + "content_quality_score": round(min(100.0, overall * (1.0 + quality["field_coverage"] * 0.15)), 2), + "engagement_score": round(min(100.0, overall * (0.9 + quality["specificity"] * 0.2)), 2), + "conversion_score": round(min(100.0, overall * (0.9 + quality["evidence_density"] * 0.2)), 2), + "innovation_score": round(min(100.0, overall * (0.95 + quality["specificity"] * 0.15)), 2), + } + + def extract_market_positioning(self, ai_recommendations: Dict[str, Any]) -> Dict[str, Any]: + entries, _ = self._normalize_recommendations(ai_recommendations) + if not entries: + return {"industry_position": "unknown", "competitive_advantage": "insufficient_data", "market_share": "unknown", "positioning_score": 0} + + top = max(entries, key=lambda e: e["confidence"]) + positioning_score = int(min(5, max(1, round(1 + (top["confidence"] * 4))))) + return { + "industry_position": top["priority"], + "competitive_advantage": top["text"], + "market_share": "unknown", + "positioning_score": positioning_score, + } + def extract_competitive_advantages(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]: - """Extract competitive advantages from AI recommendations.""" + entries, _ = self._normalize_recommendations(ai_recommendations) return [ - { - 'advantage': 'AI-powered content creation', - 'impact': 'High', - 'implementation': 'In Progress' - }, - { - 'advantage': 'Data-driven strategy', - 'impact': 'Medium', - 'implementation': 'Complete' - } + {"advantage": e["text"], "impact": e["impact"].title(), "implementation": e["implementation"]} + for e in entries[:5] ] - + def extract_strategic_risks(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]: - """Extract strategic risks from AI recommendations.""" - return [ - { - 'risk': 'Content saturation in market', - 'probability': 'Medium', - 'impact': 'High' - }, - { - 'risk': 'Algorithm changes affecting reach', - 'probability': 'High', - 'impact': 'Medium' - } - ] - + entries, _ = self._normalize_recommendations(ai_recommendations) + risks = [e for e in entries if any(k in e["text"].lower() for k in ["risk", "threat", "decline", "churn"])] + return [{"risk": e["text"], "probability": e["probability"].title(), "impact": e["impact"].title()} for e in risks[:5]] + def extract_opportunity_analysis(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]: - """Extract opportunity analysis from AI recommendations.""" + entries, _ = self._normalize_recommendations(ai_recommendations) + opportunities = [e for e in entries if any(k in e["text"].lower() for k in ["opportunity", "expand", "growth", "increase"])] return [ - { - 'opportunity': 'Video content expansion', - 'potential_impact': 'High', - 'implementation_ease': 'Medium' - }, - { - 'opportunity': 'Social media engagement', - 'potential_impact': 'Medium', - 'implementation_ease': 'High' - } + {"opportunity": e["text"], "potential_impact": e["impact"].title(), "implementation_ease": e["implementation"]} + for e in opportunities[:5] ] - + def validate_ai_response_quality(self, ai_response: Dict[str, Any]) -> Dict[str, Any]: - """Validate the quality of AI response.""" - quality_metrics = { - 'completeness': 0.0, - 'relevance': 0.0, - 'actionability': 0.0, - 'confidence': 0.0, - 'overall_quality': 0.0 - } - - # Calculate completeness - required_fields = ['recommendations', 'insights', 'metrics'] + entries, failures = self._normalize_recommendations(ai_response) + quality = self._compute_recommendation_quality(entries) + + required_fields = ["recommendations", "insights", "metrics"] present_fields = sum(1 for field in required_fields if field in ai_response) - quality_metrics['completeness'] = present_fields / len(required_fields) - - # Calculate relevance (placeholder logic) - quality_metrics['relevance'] = 0.8 if ai_response.get('analysis_type') else 0.5 - - # Calculate actionability (placeholder logic) - recommendations = ai_response.get('recommendations', []) - quality_metrics['actionability'] = min(1.0, len(recommendations) / 5.0) - - # Calculate confidence - metrics = ai_response.get('metrics', {}) - quality_metrics['confidence'] = metrics.get('confidence', 0.5) - - # Calculate overall quality - quality_metrics['overall_quality'] = sum(quality_metrics.values()) / len(quality_metrics) - - return quality_metrics - + completeness = present_fields / len(required_fields) + + confidence = 0.0 + if entries: + confidence = sum(e["confidence"] for e in entries) / len(entries) + + return { + "completeness": completeness, + "relevance": quality["field_coverage"], + "actionability": quality["specificity"], + "confidence": confidence, + "overall_quality": (completeness + quality["overall_quality"] + confidence) / 3, + "validation_failures": failures, + } + def assess_strategy_quality(self, strategy_data: Dict[str, Any]) -> Dict[str, Any]: """Assess the overall quality of a content strategy.""" quality_assessment = { @@ -181,25 +253,16 @@ class QualityValidationService: 'competitive_positioning': 0.0, 'overall_quality': 0.0 } - - # Assess data completeness + required_fields = [ 'business_objectives', 'target_metrics', 'content_budget', 'team_size', 'implementation_timeline' ] present_fields = sum(1 for field in required_fields if strategy_data.get(field)) quality_assessment['data_completeness'] = present_fields / len(required_fields) - - # Assess strategic clarity (placeholder logic) quality_assessment['strategic_clarity'] = 0.7 if strategy_data.get('business_objectives') else 0.3 - - # Assess implementation readiness (placeholder logic) quality_assessment['implementation_readiness'] = 0.6 if strategy_data.get('team_size') else 0.2 - - # Assess competitive positioning (placeholder logic) quality_assessment['competitive_positioning'] = 0.5 if strategy_data.get('competitive_position') else 0.2 - - # Calculate overall quality quality_assessment['overall_quality'] = sum(quality_assessment.values()) / len(quality_assessment) - - return quality_assessment \ No newline at end of file + + return quality_assessment diff --git a/backend/tests/test_quality_validation_service.py b/backend/tests/test_quality_validation_service.py new file mode 100644 index 00000000..5c1eda72 --- /dev/null +++ b/backend/tests/test_quality_validation_service.py @@ -0,0 +1,109 @@ +import importlib.util +from pathlib import Path + + +def _load_service_class(): + module_path = Path(__file__).resolve().parents[1] / "api/content_planning/services/content_strategy/ai_analysis/quality_validation.py" + spec = importlib.util.spec_from_file_location("quality_validation", module_path) + module = importlib.util.module_from_spec(spec) + assert spec and spec.loader + spec.loader.exec_module(module) + return module.QualityValidationService + + +QualityValidationService = _load_service_class() + + +def _service(): + return QualityValidationService() + + +def test_quality_validation_good_payload(): + payload = { + "market_analysis": { + "recommendations": [ + { + "recommendation": "Expand webinar content to enterprise segment by Q3 with 15% MQL target", + "evidence": "Pipeline attribution shows webinars convert 2.1x vs blog traffic", + "priority": "high", + "impact": "high", + "confidence": 0.9, + "timeline": "Q3", + "owner": "Demand Gen", + "kpi": "MQL" + }, + { + "recommendation": "Increase LinkedIn video cadence to 3 posts/week", + "evidence": "Audience engagement up 28% on short-form clips", + "priority": "medium", + "impact": "medium", + "confidence": 0.8, + "channel": "LinkedIn", + "metric": "Engagement rate" + }, + ] + } + } + + service = _service() + scores = service.calculate_strategic_scores(payload) + quality = service.validate_ai_response_quality(payload) + advantages = service.extract_competitive_advantages(payload) + + assert scores["overall_score"] > 50 + assert quality["overall_quality"] > 0.5 + assert quality["validation_failures"] == [] + assert len(advantages) == 2 + assert advantages[0]["advantage"].startswith("Expand webinar") + + +def test_quality_validation_partial_payload_handles_guardrails(): + payload = { + "channel_strategy": { + "recommendation": "Opportunity: expand newsletter personalization for retention" + }, + "invalid_section": ["bad-shape"], + } + + service = _service() + quality = service.validate_ai_response_quality(payload) + opportunities = service.extract_opportunity_analysis(payload) + + assert quality["overall_quality"] >= 0 + assert len(quality["validation_failures"]) >= 1 + assert len(opportunities) == 1 + assert opportunities[0]["opportunity"].startswith("Opportunity") + + +def test_quality_validation_invalid_payload(): + service = _service() + quality = service.validate_ai_response_quality("not-a-dict") + scores = service.calculate_strategic_scores("not-a-dict") + + assert quality["overall_quality"] == 0 + assert quality["validation_failures"][0]["error"] == "invalid_root" + assert scores["overall_score"] == 0 + + +def test_risk_extraction_from_deterministic_input(): + payload = { + "risk_analysis": { + "recommendations": [ + { + "title": "Risk: organic traffic decline due to SERP feature expansion", + "probability": "high", + "impact": "high", + "confidence": 0.7, + } + ] + } + } + + risks = _service().extract_strategic_risks(payload) + assert risks == [ + { + "risk": "Risk: organic traffic decline due to SERP feature expansion", + "probability": "High", + "impact": "High", + } + ]