Compare commits

...

1 Commits

Author SHA1 Message Date
ي
ea0a7ec34e Implement deterministic quality evaluators for AI recommendations 2026-05-28 09:17:32 +05:30
2 changed files with 295 additions and 123 deletions

View File

@@ -4,13 +4,18 @@ AI response quality assessment and strategic analysis.
""" """
import logging import logging
from typing import Dict, Any, List from typing import Dict, Any, List, Tuple
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class QualityValidationService: class QualityValidationService:
"""Service for quality validation and strategic analysis.""" """Service for quality validation and strategic analysis."""
_RECOMMENDATION_FIELDS = ("recommendation", "title", "action", "description")
_EVIDENCE_FIELDS = ("evidence", "rationale", "reason", "justification", "supporting_data")
_SPECIFICITY_FIELDS = ("owner", "timeline", "kpi", "metric", "target", "channel", "audience")
def __init__(self): def __init__(self):
pass pass
@@ -20,6 +25,7 @@ class QualityValidationService:
Schema format example: Schema format example:
{"type": "object", "required": ["strategy_brief", "channels"], "properties": {"strategy_brief": {"type": "object"}, "channels": {"type": "array"}}} {"type": "object", "required": ["strategy_brief", "channels"], "properties": {"strategy_brief": {"type": "object"}, "channels": {"type": "array"}}}
""" """
def _check(node, sch, path="$"): def _check(node, sch, path="$"):
t = sch.get("type") t = sch.get("type")
if t == "object": if t == "object":
@@ -51,126 +57,192 @@ class QualityValidationService:
return return
else: else:
return return
_check(data, schema) _check(data, schema)
def calculate_strategic_scores(self, ai_recommendations: Dict[str, Any]) -> Dict[str, float]: def _safe_text(self, value: Any) -> str:
"""Calculate strategic performance scores from AI recommendations.""" return value.strip() if isinstance(value, str) else ""
scores = {
'overall_score': 0.0, def _normalize_recommendations(self, ai_recommendations: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], List[Dict[str, str]]]:
'content_quality_score': 0.0, """Flatten heterogeneous AI payload into normalized recommendation entries."""
'engagement_score': 0.0, entries: List[Dict[str, Any]] = []
'conversion_score': 0.0, failures: List[Dict[str, str]] = []
'innovation_score': 0.0
if not isinstance(ai_recommendations, dict):
failures.append({"error": "invalid_root", "detail": "ai_recommendations must be a dictionary"})
return entries, failures
for section, payload in ai_recommendations.items():
if not isinstance(payload, dict):
failures.append({"section": str(section), "error": "invalid_section", "detail": "section payload must be an object"})
continue
items = payload.get("recommendations")
if items is None:
candidate = payload.get("recommendation") or payload.get("action") or payload.get("description")
if isinstance(candidate, str) and candidate.strip():
items = [{"recommendation": candidate}]
else:
failures.append({"section": str(section), "error": "missing_recommendations", "detail": "section missing recommendations list"})
continue
if isinstance(items, dict):
items = [items]
if not isinstance(items, list):
failures.append({"section": str(section), "error": "invalid_recommendations", "detail": "recommendations must be list or object"})
continue
for idx, item in enumerate(items):
if not isinstance(item, dict):
failures.append({"section": str(section), "error": "invalid_item", "detail": f"recommendation[{idx}] must be object"})
continue
rec_text = next((self._safe_text(item.get(field)) for field in self._RECOMMENDATION_FIELDS if self._safe_text(item.get(field))), "")
if not rec_text:
failures.append({"section": str(section), "error": "missing_text", "detail": f"recommendation[{idx}] missing primary recommendation text"})
continue
confidence = item.get("confidence", payload.get("metrics", {}).get("confidence", 0.5))
try:
confidence = float(confidence)
except (ValueError, TypeError):
confidence = 0.5
evidence = next((self._safe_text(item.get(field)) for field in self._EVIDENCE_FIELDS if self._safe_text(item.get(field))), "")
entry = {
"section": section,
"text": rec_text,
"confidence": max(0.0, min(1.0, confidence)),
"priority": self._safe_text(item.get("priority")) or "medium",
"impact": self._safe_text(item.get("impact")) or "medium",
"probability": self._safe_text(item.get("probability")) or "medium",
"implementation": self._safe_text(item.get("implementation")) or self._safe_text(item.get("status")) or "unspecified",
"evidence": evidence,
"metadata": item,
}
entries.append(entry)
if failures:
logger.warning("quality_validation_normalization_failures", extra={"validation_failures": failures})
return entries, failures
def _compute_recommendation_quality(self, entries: List[Dict[str, Any]]) -> Dict[str, float]:
if not entries:
return {"evidence_density": 0.0, "specificity": 0.0, "field_coverage": 0.0, "overall_quality": 0.0}
evidence_count = sum(1 for e in entries if e.get("evidence"))
specificity_hits = 0
for entry in entries:
metadata = entry.get("metadata", {})
for field in self._SPECIFICITY_FIELDS:
if self._safe_text(metadata.get(field)):
specificity_hits += 1
if any(ch.isdigit() for ch in entry.get("text", "")):
specificity_hits += 1
coverage_fields = ["text", "priority", "impact", "confidence", "implementation", "section"]
present = sum(1 for e in entries for field in coverage_fields if e.get(field) not in (None, ""))
max_fields = len(entries) * len(coverage_fields)
evidence_density = evidence_count / len(entries)
specificity = min(1.0, specificity_hits / (len(entries) * 3))
field_coverage = present / max_fields if max_fields else 0.0
overall = (0.35 * evidence_density) + (0.35 * specificity) + (0.30 * field_coverage)
return {
"evidence_density": evidence_density,
"specificity": specificity,
"field_coverage": field_coverage,
"overall_quality": overall,
} }
# Calculate scores based on AI recommendations def calculate_strategic_scores(self, ai_recommendations: Dict[str, Any]) -> Dict[str, float]:
total_confidence = 0 entries, _ = self._normalize_recommendations(ai_recommendations)
total_score = 0 quality = self._compute_recommendation_quality(entries)
for analysis_type, recommendations in ai_recommendations.items(): if not entries:
if isinstance(recommendations, dict) and 'metrics' in recommendations: return {
metrics = recommendations['metrics'] "overall_score": 0.0,
score = metrics.get('score', 50) "content_quality_score": 0.0,
confidence = metrics.get('confidence', 0.5) "engagement_score": 0.0,
"conversion_score": 0.0,
"innovation_score": 0.0,
}
total_score += score * confidence weighted_score = 0.0
total_confidence += confidence total_confidence = 0.0
for entry in entries:
weight = entry["confidence"]
priority_boost = {"high": 1.1, "medium": 1.0, "low": 0.9}.get(entry["priority"].lower(), 1.0)
impact_boost = {"high": 1.1, "medium": 1.0, "low": 0.9}.get(entry["impact"].lower(), 1.0)
entry_score = 100.0 * quality["overall_quality"] * priority_boost * impact_boost
weighted_score += entry_score * weight
total_confidence += weight
if total_confidence > 0: overall = weighted_score / total_confidence if total_confidence else 0.0
scores['overall_score'] = total_score / total_confidence return {
"overall_score": round(overall, 2),
# Set other scores based on overall score "content_quality_score": round(min(100.0, overall * (1.0 + quality["field_coverage"] * 0.15)), 2),
scores['content_quality_score'] = scores['overall_score'] * 1.1 "engagement_score": round(min(100.0, overall * (0.9 + quality["specificity"] * 0.2)), 2),
scores['engagement_score'] = scores['overall_score'] * 0.9 "conversion_score": round(min(100.0, overall * (0.9 + quality["evidence_density"] * 0.2)), 2),
scores['conversion_score'] = scores['overall_score'] * 0.95 "innovation_score": round(min(100.0, overall * (0.95 + quality["specificity"] * 0.15)), 2),
scores['innovation_score'] = scores['overall_score'] * 1.05 }
return scores
def extract_market_positioning(self, ai_recommendations: Dict[str, Any]) -> Dict[str, Any]: def extract_market_positioning(self, ai_recommendations: Dict[str, Any]) -> Dict[str, Any]:
"""Extract market positioning from AI recommendations.""" entries, _ = self._normalize_recommendations(ai_recommendations)
if not entries:
return {"industry_position": "unknown", "competitive_advantage": "insufficient_data", "market_share": "unknown", "positioning_score": 0}
top = max(entries, key=lambda e: e["confidence"])
positioning_score = int(min(5, max(1, round(1 + (top["confidence"] * 4)))))
return { return {
'industry_position': 'emerging', "industry_position": top["priority"],
'competitive_advantage': 'AI-powered content', "competitive_advantage": top["text"],
'market_share': '2.5%', "market_share": "unknown",
'positioning_score': 4 "positioning_score": positioning_score,
} }
def extract_competitive_advantages(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]: def extract_competitive_advantages(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Extract competitive advantages from AI recommendations.""" entries, _ = self._normalize_recommendations(ai_recommendations)
return [ return [
{ {"advantage": e["text"], "impact": e["impact"].title(), "implementation": e["implementation"]}
'advantage': 'AI-powered content creation', for e in entries[:5]
'impact': 'High',
'implementation': 'In Progress'
},
{
'advantage': 'Data-driven strategy',
'impact': 'Medium',
'implementation': 'Complete'
}
] ]
def extract_strategic_risks(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]: def extract_strategic_risks(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Extract strategic risks from AI recommendations.""" entries, _ = self._normalize_recommendations(ai_recommendations)
return [ risks = [e for e in entries if any(k in e["text"].lower() for k in ["risk", "threat", "decline", "churn"])]
{ return [{"risk": e["text"], "probability": e["probability"].title(), "impact": e["impact"].title()} for e in risks[:5]]
'risk': 'Content saturation in market',
'probability': 'Medium',
'impact': 'High'
},
{
'risk': 'Algorithm changes affecting reach',
'probability': 'High',
'impact': 'Medium'
}
]
def extract_opportunity_analysis(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]: def extract_opportunity_analysis(self, ai_recommendations: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Extract opportunity analysis from AI recommendations.""" entries, _ = self._normalize_recommendations(ai_recommendations)
opportunities = [e for e in entries if any(k in e["text"].lower() for k in ["opportunity", "expand", "growth", "increase"])]
return [ return [
{ {"opportunity": e["text"], "potential_impact": e["impact"].title(), "implementation_ease": e["implementation"]}
'opportunity': 'Video content expansion', for e in opportunities[:5]
'potential_impact': 'High',
'implementation_ease': 'Medium'
},
{
'opportunity': 'Social media engagement',
'potential_impact': 'Medium',
'implementation_ease': 'High'
}
] ]
def validate_ai_response_quality(self, ai_response: Dict[str, Any]) -> Dict[str, Any]: def validate_ai_response_quality(self, ai_response: Dict[str, Any]) -> Dict[str, Any]:
"""Validate the quality of AI response.""" entries, failures = self._normalize_recommendations(ai_response)
quality_metrics = { quality = self._compute_recommendation_quality(entries)
'completeness': 0.0,
'relevance': 0.0,
'actionability': 0.0,
'confidence': 0.0,
'overall_quality': 0.0
}
# Calculate completeness required_fields = ["recommendations", "insights", "metrics"]
required_fields = ['recommendations', 'insights', 'metrics']
present_fields = sum(1 for field in required_fields if field in ai_response) present_fields = sum(1 for field in required_fields if field in ai_response)
quality_metrics['completeness'] = present_fields / len(required_fields) completeness = present_fields / len(required_fields)
# Calculate relevance (placeholder logic) confidence = 0.0
quality_metrics['relevance'] = 0.8 if ai_response.get('analysis_type') else 0.5 if entries:
confidence = sum(e["confidence"] for e in entries) / len(entries)
# Calculate actionability (placeholder logic) return {
recommendations = ai_response.get('recommendations', []) "completeness": completeness,
quality_metrics['actionability'] = min(1.0, len(recommendations) / 5.0) "relevance": quality["field_coverage"],
"actionability": quality["specificity"],
# Calculate confidence "confidence": confidence,
metrics = ai_response.get('metrics', {}) "overall_quality": (completeness + quality["overall_quality"] + confidence) / 3,
quality_metrics['confidence'] = metrics.get('confidence', 0.5) "validation_failures": failures,
}
# Calculate overall quality
quality_metrics['overall_quality'] = sum(quality_metrics.values()) / len(quality_metrics)
return quality_metrics
def assess_strategy_quality(self, strategy_data: Dict[str, Any]) -> Dict[str, Any]: def assess_strategy_quality(self, strategy_data: Dict[str, Any]) -> Dict[str, Any]:
"""Assess the overall quality of a content strategy.""" """Assess the overall quality of a content strategy."""
@@ -182,24 +254,15 @@ class QualityValidationService:
'overall_quality': 0.0 'overall_quality': 0.0
} }
# Assess data completeness
required_fields = [ required_fields = [
'business_objectives', 'target_metrics', 'content_budget', 'business_objectives', 'target_metrics', 'content_budget',
'team_size', 'implementation_timeline' 'team_size', 'implementation_timeline'
] ]
present_fields = sum(1 for field in required_fields if strategy_data.get(field)) present_fields = sum(1 for field in required_fields if strategy_data.get(field))
quality_assessment['data_completeness'] = present_fields / len(required_fields) quality_assessment['data_completeness'] = present_fields / len(required_fields)
# Assess strategic clarity (placeholder logic)
quality_assessment['strategic_clarity'] = 0.7 if strategy_data.get('business_objectives') else 0.3 quality_assessment['strategic_clarity'] = 0.7 if strategy_data.get('business_objectives') else 0.3
# Assess implementation readiness (placeholder logic)
quality_assessment['implementation_readiness'] = 0.6 if strategy_data.get('team_size') else 0.2 quality_assessment['implementation_readiness'] = 0.6 if strategy_data.get('team_size') else 0.2
# Assess competitive positioning (placeholder logic)
quality_assessment['competitive_positioning'] = 0.5 if strategy_data.get('competitive_position') else 0.2 quality_assessment['competitive_positioning'] = 0.5 if strategy_data.get('competitive_position') else 0.2
# Calculate overall quality
quality_assessment['overall_quality'] = sum(quality_assessment.values()) / len(quality_assessment) quality_assessment['overall_quality'] = sum(quality_assessment.values()) / len(quality_assessment)
return quality_assessment return quality_assessment

View File

@@ -0,0 +1,109 @@
import importlib.util
from pathlib import Path
def _load_service_class():
module_path = Path(__file__).resolve().parents[1] / "api/content_planning/services/content_strategy/ai_analysis/quality_validation.py"
spec = importlib.util.spec_from_file_location("quality_validation", module_path)
module = importlib.util.module_from_spec(spec)
assert spec and spec.loader
spec.loader.exec_module(module)
return module.QualityValidationService
QualityValidationService = _load_service_class()
def _service():
return QualityValidationService()
def test_quality_validation_good_payload():
payload = {
"market_analysis": {
"recommendations": [
{
"recommendation": "Expand webinar content to enterprise segment by Q3 with 15% MQL target",
"evidence": "Pipeline attribution shows webinars convert 2.1x vs blog traffic",
"priority": "high",
"impact": "high",
"confidence": 0.9,
"timeline": "Q3",
"owner": "Demand Gen",
"kpi": "MQL"
},
{
"recommendation": "Increase LinkedIn video cadence to 3 posts/week",
"evidence": "Audience engagement up 28% on short-form clips",
"priority": "medium",
"impact": "medium",
"confidence": 0.8,
"channel": "LinkedIn",
"metric": "Engagement rate"
},
]
}
}
service = _service()
scores = service.calculate_strategic_scores(payload)
quality = service.validate_ai_response_quality(payload)
advantages = service.extract_competitive_advantages(payload)
assert scores["overall_score"] > 50
assert quality["overall_quality"] > 0.5
assert quality["validation_failures"] == []
assert len(advantages) == 2
assert advantages[0]["advantage"].startswith("Expand webinar")
def test_quality_validation_partial_payload_handles_guardrails():
payload = {
"channel_strategy": {
"recommendation": "Opportunity: expand newsletter personalization for retention"
},
"invalid_section": ["bad-shape"],
}
service = _service()
quality = service.validate_ai_response_quality(payload)
opportunities = service.extract_opportunity_analysis(payload)
assert quality["overall_quality"] >= 0
assert len(quality["validation_failures"]) >= 1
assert len(opportunities) == 1
assert opportunities[0]["opportunity"].startswith("Opportunity")
def test_quality_validation_invalid_payload():
service = _service()
quality = service.validate_ai_response_quality("not-a-dict")
scores = service.calculate_strategic_scores("not-a-dict")
assert quality["overall_quality"] == 0
assert quality["validation_failures"][0]["error"] == "invalid_root"
assert scores["overall_score"] == 0
def test_risk_extraction_from_deterministic_input():
payload = {
"risk_analysis": {
"recommendations": [
{
"title": "Risk: organic traffic decline due to SERP feature expansion",
"probability": "high",
"impact": "high",
"confidence": 0.7,
}
]
}
}
risks = _service().extract_strategic_risks(payload)
assert risks == [
{
"risk": "Risk: organic traffic decline due to SERP feature expansion",
"probability": "High",
"impact": "High",
}
]