Files
ALwrity/backend/services/ai_quality_analysis_service.py
2025-08-19 21:48:33 +05:30

612 lines
26 KiB
Python

"""
AI Quality Analysis Service
Provides AI-powered quality assessment and recommendations for content strategies.
"""
import logging
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
from services.llm_providers.gemini_provider import gemini_structured_json_response
from services.strategy_service import StrategyService
from models.enhanced_strategy_models import EnhancedContentStrategy
logger = logging.getLogger(__name__)
class QualityScore(Enum):
EXCELLENT = "excellent"
GOOD = "good"
NEEDS_ATTENTION = "needs_attention"
POOR = "poor"
@dataclass
class QualityMetric:
name: str
score: float # 0-100
weight: float # 0-1
status: QualityScore
description: str
recommendations: List[str]
@dataclass
class QualityAnalysisResult:
overall_score: float
overall_status: QualityScore
metrics: List[QualityMetric]
recommendations: List[str]
confidence_score: float
analysis_timestamp: datetime
strategy_id: int
# Structured JSON schemas for Gemini API
QUALITY_ANALYSIS_SCHEMA = {
"type": "OBJECT",
"properties": {
"score": {"type": "NUMBER"},
"status": {"type": "STRING"},
"description": {"type": "STRING"},
"recommendations": {
"type": "ARRAY",
"items": {"type": "STRING"}
}
},
"propertyOrdering": ["score", "status", "description", "recommendations"]
}
RECOMMENDATIONS_SCHEMA = {
"type": "OBJECT",
"properties": {
"recommendations": {
"type": "ARRAY",
"items": {"type": "STRING"}
},
"priority_areas": {
"type": "ARRAY",
"items": {"type": "STRING"}
}
},
"propertyOrdering": ["recommendations", "priority_areas"]
}
class AIQualityAnalysisService:
"""AI-powered quality assessment service for content strategies."""
def __init__(self):
self.strategy_service = StrategyService()
async def analyze_strategy_quality(self, strategy_id: int) -> QualityAnalysisResult:
"""Analyze strategy quality using AI and return comprehensive results."""
try:
logger.info(f"Starting AI quality analysis for strategy {strategy_id}")
# Get strategy data
strategy_data = await self.strategy_service.get_strategy_by_id(strategy_id)
if not strategy_data:
raise ValueError(f"Strategy {strategy_id} not found")
# Perform comprehensive quality analysis
quality_metrics = await self._analyze_quality_metrics(strategy_data)
# Calculate overall score
overall_score = self._calculate_overall_score(quality_metrics)
overall_status = self._determine_overall_status(overall_score)
# Generate AI recommendations
recommendations = await self._generate_ai_recommendations(strategy_data, quality_metrics)
# Calculate confidence score
confidence_score = self._calculate_confidence_score(quality_metrics)
result = QualityAnalysisResult(
overall_score=overall_score,
overall_status=overall_status,
metrics=quality_metrics,
recommendations=recommendations,
confidence_score=confidence_score,
analysis_timestamp=datetime.utcnow(),
strategy_id=strategy_id
)
# Save analysis result to database
await self._save_quality_analysis(result)
logger.info(f"Quality analysis completed for strategy {strategy_id}. Score: {overall_score}")
return result
except Exception as e:
logger.error(f"Error analyzing strategy quality for {strategy_id}: {e}")
raise
async def _analyze_quality_metrics(self, strategy_data: Dict[str, Any]) -> List[QualityMetric]:
"""Analyze individual quality metrics for a strategy."""
metrics = []
# 1. Strategic Completeness Analysis
completeness_metric = await self._analyze_strategic_completeness(strategy_data)
metrics.append(completeness_metric)
# 2. Audience Intelligence Quality
audience_metric = await self._analyze_audience_intelligence(strategy_data)
metrics.append(audience_metric)
# 3. Competitive Intelligence Quality
competitive_metric = await self._analyze_competitive_intelligence(strategy_data)
metrics.append(competitive_metric)
# 4. Content Strategy Quality
content_metric = await self._analyze_content_strategy(strategy_data)
metrics.append(content_metric)
# 5. Performance Alignment Quality
performance_metric = await self._analyze_performance_alignment(strategy_data)
metrics.append(performance_metric)
# 6. Implementation Feasibility
feasibility_metric = await self._analyze_implementation_feasibility(strategy_data)
metrics.append(feasibility_metric)
return metrics
async def _analyze_strategic_completeness(self, strategy_data: Dict[str, Any]) -> QualityMetric:
"""Analyze strategic completeness and depth."""
try:
# Check required fields
required_fields = [
'business_objectives', 'target_metrics', 'content_budget',
'team_size', 'implementation_timeline', 'market_share'
]
filled_fields = sum(1 for field in required_fields if strategy_data.get(field))
completeness_score = (filled_fields / len(required_fields)) * 100
# AI analysis of strategic depth
prompt = f"""
Analyze the strategic completeness of this content strategy:
Business Objectives: {strategy_data.get('business_objectives', 'Not provided')}
Target Metrics: {strategy_data.get('target_metrics', 'Not provided')}
Content Budget: {strategy_data.get('content_budget', 'Not provided')}
Team Size: {strategy_data.get('team_size', 'Not provided')}
Implementation Timeline: {strategy_data.get('implementation_timeline', 'Not provided')}
Market Share: {strategy_data.get('market_share', 'Not provided')}
Provide a quality score (0-100), status (excellent/good/needs_attention/poor), description, and specific recommendations for improvement.
Focus on strategic depth, clarity, and measurability.
"""
ai_response = await gemini_structured_json_response(
prompt=prompt,
schema=QUALITY_ANALYSIS_SCHEMA,
temperature=0.3,
max_tokens=2048
)
if "error" in ai_response:
raise ValueError(f"AI analysis failed: {ai_response['error']}")
# Parse AI response
ai_score = ai_response.get('score', 60.0)
ai_status = ai_response.get('status', 'needs_attention')
description = ai_response.get('description', 'Strategic completeness analysis')
recommendations = ai_response.get('recommendations', [])
# Combine manual and AI scores
final_score = (completeness_score * 0.4) + (ai_score * 0.6)
return QualityMetric(
name="Strategic Completeness",
score=final_score,
weight=0.25,
status=self._parse_status(ai_status),
description=description,
recommendations=recommendations
)
except Exception as e:
logger.error(f"Error analyzing strategic completeness: {e}")
raise ValueError(f"Failed to analyze strategic completeness: {str(e)}")
async def _analyze_audience_intelligence(self, strategy_data: Dict[str, Any]) -> QualityMetric:
"""Analyze audience intelligence quality."""
try:
audience_fields = [
'content_preferences', 'consumption_patterns', 'audience_pain_points',
'buying_journey', 'seasonal_trends', 'engagement_metrics'
]
filled_fields = sum(1 for field in audience_fields if strategy_data.get(field))
completeness_score = (filled_fields / len(audience_fields)) * 100
# AI analysis of audience insights
prompt = f"""
Analyze the audience intelligence quality of this content strategy:
Content Preferences: {strategy_data.get('content_preferences', 'Not provided')}
Consumption Patterns: {strategy_data.get('consumption_patterns', 'Not provided')}
Audience Pain Points: {strategy_data.get('audience_pain_points', 'Not provided')}
Buying Journey: {strategy_data.get('buying_journey', 'Not provided')}
Seasonal Trends: {strategy_data.get('seasonal_trends', 'Not provided')}
Engagement Metrics: {strategy_data.get('engagement_metrics', 'Not provided')}
Provide a quality score (0-100), status (excellent/good/needs_attention/poor), description, and specific recommendations for improvement.
Focus on audience understanding, segmentation, and actionable insights.
"""
ai_response = await gemini_structured_json_response(
prompt=prompt,
schema=QUALITY_ANALYSIS_SCHEMA,
temperature=0.3,
max_tokens=2048
)
if "error" in ai_response:
raise ValueError(f"AI analysis failed: {ai_response['error']}")
ai_score = ai_response.get('score', 60.0)
ai_status = ai_response.get('status', 'needs_attention')
description = ai_response.get('description', 'Audience intelligence analysis')
recommendations = ai_response.get('recommendations', [])
final_score = (completeness_score * 0.3) + (ai_score * 0.7)
return QualityMetric(
name="Audience Intelligence",
score=final_score,
weight=0.20,
status=self._parse_status(ai_status),
description=description,
recommendations=recommendations
)
except Exception as e:
logger.error(f"Error analyzing audience intelligence: {e}")
raise ValueError(f"Failed to analyze audience intelligence: {str(e)}")
async def _analyze_competitive_intelligence(self, strategy_data: Dict[str, Any]) -> QualityMetric:
"""Analyze competitive intelligence quality."""
try:
competitive_fields = [
'top_competitors', 'competitor_content_strategies', 'market_gaps',
'industry_trends', 'emerging_trends'
]
filled_fields = sum(1 for field in competitive_fields if strategy_data.get(field))
completeness_score = (filled_fields / len(competitive_fields)) * 100
# AI analysis of competitive insights
prompt = f"""
Analyze the competitive intelligence quality of this content strategy:
Top Competitors: {strategy_data.get('top_competitors', 'Not provided')}
Competitor Content Strategies: {strategy_data.get('competitor_content_strategies', 'Not provided')}
Market Gaps: {strategy_data.get('market_gaps', 'Not provided')}
Industry Trends: {strategy_data.get('industry_trends', 'Not provided')}
Emerging Trends: {strategy_data.get('emerging_trends', 'Not provided')}
Provide a quality score (0-100), status (excellent/good/needs_attention/poor), description, and specific recommendations for improvement.
Focus on competitive positioning, differentiation opportunities, and market insights.
"""
ai_response = await gemini_structured_json_response(
prompt=prompt,
schema=QUALITY_ANALYSIS_SCHEMA,
temperature=0.3,
max_tokens=2048
)
if "error" in ai_response:
raise ValueError(f"AI analysis failed: {ai_response['error']}")
ai_score = ai_response.get('score', 60.0)
ai_status = ai_response.get('status', 'needs_attention')
description = ai_response.get('description', 'Competitive intelligence analysis')
recommendations = ai_response.get('recommendations', [])
final_score = (completeness_score * 0.3) + (ai_score * 0.7)
return QualityMetric(
name="Competitive Intelligence",
score=final_score,
weight=0.15,
status=self._parse_status(ai_status),
description=description,
recommendations=recommendations
)
except Exception as e:
logger.error(f"Error analyzing competitive intelligence: {e}")
raise ValueError(f"Failed to analyze competitive intelligence: {str(e)}")
async def _analyze_content_strategy(self, strategy_data: Dict[str, Any]) -> QualityMetric:
"""Analyze content strategy quality."""
try:
content_fields = [
'preferred_formats', 'content_mix', 'content_frequency',
'optimal_timing', 'quality_metrics', 'editorial_guidelines', 'brand_voice'
]
filled_fields = sum(1 for field in content_fields if strategy_data.get(field))
completeness_score = (filled_fields / len(content_fields)) * 100
# AI analysis of content strategy
prompt = f"""
Analyze the content strategy quality:
Preferred Formats: {strategy_data.get('preferred_formats', 'Not provided')}
Content Mix: {strategy_data.get('content_mix', 'Not provided')}
Content Frequency: {strategy_data.get('content_frequency', 'Not provided')}
Optimal Timing: {strategy_data.get('optimal_timing', 'Not provided')}
Quality Metrics: {strategy_data.get('quality_metrics', 'Not provided')}
Editorial Guidelines: {strategy_data.get('editorial_guidelines', 'Not provided')}
Brand Voice: {strategy_data.get('brand_voice', 'Not provided')}
Provide a quality score (0-100), status (excellent/good/needs_attention/poor), description, and specific recommendations for improvement.
Focus on content planning, execution strategy, and quality standards.
"""
ai_response = await gemini_structured_json_response(
prompt=prompt,
schema=QUALITY_ANALYSIS_SCHEMA,
temperature=0.3,
max_tokens=2048
)
if "error" in ai_response:
raise ValueError(f"AI analysis failed: {ai_response['error']}")
ai_score = ai_response.get('score', 60.0)
ai_status = ai_response.get('status', 'needs_attention')
description = ai_response.get('description', 'Content strategy analysis')
recommendations = ai_response.get('recommendations', [])
final_score = (completeness_score * 0.3) + (ai_score * 0.7)
return QualityMetric(
name="Content Strategy",
score=final_score,
weight=0.20,
status=self._parse_status(ai_status),
description=description,
recommendations=recommendations
)
except Exception as e:
logger.error(f"Error analyzing content strategy: {e}")
raise ValueError(f"Failed to analyze content strategy: {str(e)}")
async def _analyze_performance_alignment(self, strategy_data: Dict[str, Any]) -> QualityMetric:
"""Analyze performance alignment quality."""
try:
performance_fields = [
'traffic_sources', 'conversion_rates', 'content_roi_targets',
'ab_testing_capabilities'
]
filled_fields = sum(1 for field in performance_fields if strategy_data.get(field))
completeness_score = (filled_fields / len(performance_fields)) * 100
# AI analysis of performance alignment
prompt = f"""
Analyze the performance alignment quality:
Traffic Sources: {strategy_data.get('traffic_sources', 'Not provided')}
Conversion Rates: {strategy_data.get('conversion_rates', 'Not provided')}
Content ROI Targets: {strategy_data.get('content_roi_targets', 'Not provided')}
A/B Testing Capabilities: {strategy_data.get('ab_testing_capabilities', 'Not provided')}
Provide a quality score (0-100), status (excellent/good/needs_attention/poor), description, and specific recommendations for improvement.
Focus on performance measurement, optimization, and ROI alignment.
"""
ai_response = await gemini_structured_json_response(
prompt=prompt,
schema=QUALITY_ANALYSIS_SCHEMA,
temperature=0.3,
max_tokens=2048
)
if "error" in ai_response:
raise ValueError(f"AI analysis failed: {ai_response['error']}")
ai_score = ai_response.get('score', 60.0)
ai_status = ai_response.get('status', 'needs_attention')
description = ai_response.get('description', 'Performance alignment analysis')
recommendations = ai_response.get('recommendations', [])
final_score = (completeness_score * 0.3) + (ai_score * 0.7)
return QualityMetric(
name="Performance Alignment",
score=final_score,
weight=0.15,
status=self._parse_status(ai_status),
description=description,
recommendations=recommendations
)
except Exception as e:
logger.error(f"Error analyzing performance alignment: {e}")
raise ValueError(f"Failed to analyze performance alignment: {str(e)}")
async def _analyze_implementation_feasibility(self, strategy_data: Dict[str, Any]) -> QualityMetric:
"""Analyze implementation feasibility."""
try:
# Check resource availability
has_budget = bool(strategy_data.get('content_budget'))
has_team = bool(strategy_data.get('team_size'))
has_timeline = bool(strategy_data.get('implementation_timeline'))
resource_score = ((has_budget + has_team + has_timeline) / 3) * 100
# AI analysis of feasibility
prompt = f"""
Analyze the implementation feasibility of this content strategy:
Content Budget: {strategy_data.get('content_budget', 'Not provided')}
Team Size: {strategy_data.get('team_size', 'Not provided')}
Implementation Timeline: {strategy_data.get('implementation_timeline', 'Not provided')}
Industry: {strategy_data.get('industry', 'Not provided')}
Market Share: {strategy_data.get('market_share', 'Not provided')}
Provide a quality score (0-100), status (excellent/good/needs_attention/poor), description, and specific recommendations for improvement.
Focus on resource availability, timeline feasibility, and implementation challenges.
"""
ai_response = await gemini_structured_json_response(
prompt=prompt,
schema=QUALITY_ANALYSIS_SCHEMA,
temperature=0.3,
max_tokens=2048
)
if "error" in ai_response:
raise ValueError(f"AI analysis failed: {ai_response['error']}")
ai_score = ai_response.get('score', 60.0)
ai_status = ai_response.get('status', 'needs_attention')
description = ai_response.get('description', 'Implementation feasibility analysis')
recommendations = ai_response.get('recommendations', [])
final_score = (resource_score * 0.4) + (ai_score * 0.6)
return QualityMetric(
name="Implementation Feasibility",
score=final_score,
weight=0.05,
status=self._parse_status(ai_status),
description=description,
recommendations=recommendations
)
except Exception as e:
logger.error(f"Error analyzing implementation feasibility: {e}")
raise ValueError(f"Failed to analyze implementation feasibility: {str(e)}")
def _calculate_overall_score(self, metrics: List[QualityMetric]) -> float:
"""Calculate weighted overall quality score."""
if not metrics:
return 0.0
weighted_sum = sum(metric.score * metric.weight for metric in metrics)
total_weight = sum(metric.weight for metric in metrics)
return weighted_sum / total_weight if total_weight > 0 else 0.0
def _determine_overall_status(self, score: float) -> QualityScore:
"""Determine overall quality status based on score."""
if score >= 85:
return QualityScore.EXCELLENT
elif score >= 70:
return QualityScore.GOOD
elif score >= 50:
return QualityScore.NEEDS_ATTENTION
else:
return QualityScore.POOR
def _parse_status(self, status_str: str) -> QualityScore:
"""Parse status string to QualityScore enum."""
status_lower = status_str.lower()
if status_lower == 'excellent':
return QualityScore.EXCELLENT
elif status_lower == 'good':
return QualityScore.GOOD
elif status_lower == 'needs_attention':
return QualityScore.NEEDS_ATTENTION
elif status_lower == 'poor':
return QualityScore.POOR
else:
return QualityScore.NEEDS_ATTENTION
async def _generate_ai_recommendations(self, strategy_data: Dict[str, Any], metrics: List[QualityMetric]) -> List[str]:
"""Generate AI-powered recommendations for strategy improvement."""
try:
# Identify areas needing improvement
low_metrics = [m for m in metrics if m.status in [QualityScore.NEEDS_ATTENTION, QualityScore.POOR]]
if not low_metrics:
return ["Strategy quality is excellent. Continue monitoring and optimizing based on performance data."]
# Generate specific recommendations
prompt = f"""
Based on the quality analysis of this content strategy, provide 3-5 specific, actionable recommendations for improvement.
Strategy Overview:
- Industry: {strategy_data.get('industry', 'Not specified')}
- Business Objectives: {strategy_data.get('business_objectives', 'Not specified')}
Areas needing improvement:
{chr(10).join([f"- {m.name}: {m.score:.1f}/100" for m in low_metrics])}
Provide specific, actionable recommendations that can be implemented immediately.
Focus on the most impactful improvements first.
"""
ai_response = await gemini_structured_json_response(
prompt=prompt,
schema=RECOMMENDATIONS_SCHEMA,
temperature=0.3,
max_tokens=2048
)
if "error" in ai_response:
raise ValueError(f"AI recommendations failed: {ai_response['error']}")
recommendations = ai_response.get('recommendations', [])
return recommendations[:5] # Limit to 5 recommendations
except Exception as e:
logger.error(f"Error generating AI recommendations: {e}")
raise ValueError(f"Failed to generate AI recommendations: {str(e)}")
def _calculate_confidence_score(self, metrics: List[QualityMetric]) -> float:
"""Calculate confidence score based on data quality and analysis depth."""
if not metrics:
return 0.0
# Higher scores indicate more confidence
avg_score = sum(m.score for m in metrics) / len(metrics)
# More metrics analyzed = higher confidence
metric_count_factor = min(len(metrics) / 6, 1.0) # 6 is max expected metrics
confidence = (avg_score * 0.7) + (metric_count_factor * 100 * 0.3)
return min(confidence, 100.0)
async def _save_quality_analysis(self, result: QualityAnalysisResult) -> bool:
"""Save quality analysis result to database."""
try:
# This would save to a quality_analysis_results table
# For now, we'll log the result
logger.info(f"Quality analysis saved for strategy {result.strategy_id}")
return True
except Exception as e:
logger.error(f"Error saving quality analysis: {e}")
return False
async def get_quality_history(self, strategy_id: int, days: int = 30) -> List[QualityAnalysisResult]:
"""Get quality analysis history for a strategy."""
try:
# This would query the quality_analysis_results table
# For now, return empty list
return []
except Exception as e:
logger.error(f"Error getting quality history: {e}")
return []
async def get_quality_trends(self, strategy_id: int) -> Dict[str, Any]:
"""Get quality trends over time."""
try:
# This would analyze quality trends over time
# For now, return empty data
return {
"trend": "stable",
"change_rate": 0,
"consistency_score": 0
}
except Exception as e:
logger.error(f"Error getting quality trends: {e}")
return {"trend": "stable", "change_rate": 0, "consistency_score": 0}