873 lines
37 KiB
Python
873 lines
37 KiB
Python
"""
|
|
Blog Content SEO Analyzer
|
|
|
|
Specialized SEO analyzer for blog content with parallel processing.
|
|
Leverages existing non-AI SEO tools and uses single AI prompt for structured analysis.
|
|
"""
|
|
|
|
import asyncio
|
|
import re
|
|
import textstat
|
|
from datetime import datetime
|
|
from typing import Dict, Any, List, Optional
|
|
from utils.logger_utils import get_service_logger
|
|
|
|
from services.seo_analyzer import (
|
|
ContentAnalyzer, KeywordAnalyzer,
|
|
URLStructureAnalyzer, AIInsightGenerator
|
|
)
|
|
from services.llm_providers.main_text_generation import llm_text_gen
|
|
|
|
|
|
class BlogContentSEOAnalyzer:
|
|
"""Specialized SEO analyzer for blog content with parallel processing"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the blog content SEO analyzer"""
|
|
# Service-specific logger (no global reconfiguration)
|
|
global logger
|
|
logger = get_service_logger("blog_content_seo_analyzer")
|
|
self.content_analyzer = ContentAnalyzer()
|
|
self.keyword_analyzer = KeywordAnalyzer()
|
|
self.url_analyzer = URLStructureAnalyzer()
|
|
self.ai_insights = AIInsightGenerator()
|
|
|
|
logger.info("BlogContentSEOAnalyzer initialized")
|
|
|
|
async def analyze_blog_content(self, blog_content: str, research_data: Dict[str, Any], blog_title: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Main analysis method with parallel processing
|
|
|
|
Args:
|
|
blog_content: The blog content to analyze
|
|
research_data: Research data containing keywords and other insights
|
|
|
|
Returns:
|
|
Comprehensive SEO analysis results
|
|
"""
|
|
try:
|
|
logger.info("Starting blog content SEO analysis")
|
|
|
|
# Extract keywords from research data
|
|
keywords_data = self._extract_keywords_from_research(research_data)
|
|
logger.info(f"Extracted keywords: {keywords_data}")
|
|
|
|
# Phase 1: Run non-AI analyzers in parallel
|
|
logger.info("Running non-AI analyzers in parallel")
|
|
non_ai_results = await self._run_non_ai_analyzers(blog_content, keywords_data)
|
|
|
|
# Phase 2: Single AI analysis for structured insights
|
|
logger.info("Running AI analysis")
|
|
ai_insights = await self._run_ai_analysis(blog_content, keywords_data, non_ai_results)
|
|
|
|
# Phase 3: Compile and format results
|
|
logger.info("Compiling results")
|
|
results = self._compile_blog_seo_results(non_ai_results, ai_insights, keywords_data)
|
|
|
|
logger.info(f"SEO analysis completed. Overall score: {results.get('overall_score', 0)}")
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Blog SEO analysis failed: {e}")
|
|
# Fail fast - don't return fallback data
|
|
raise e
|
|
|
|
def _extract_keywords_from_research(self, research_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Extract keywords from research data"""
|
|
try:
|
|
logger.info(f"Extracting keywords from research data: {research_data}")
|
|
|
|
# Extract keywords from research data structure
|
|
keyword_analysis = research_data.get('keyword_analysis', {})
|
|
logger.info(f"Found keyword_analysis: {keyword_analysis}")
|
|
|
|
# Handle different possible structures
|
|
primary_keywords = []
|
|
long_tail_keywords = []
|
|
semantic_keywords = []
|
|
all_keywords = []
|
|
|
|
# Try to extract primary keywords from different possible locations
|
|
if 'primary' in keyword_analysis:
|
|
primary_keywords = keyword_analysis.get('primary', [])
|
|
elif 'keywords' in research_data:
|
|
# Fallback to top-level keywords
|
|
primary_keywords = research_data.get('keywords', [])
|
|
|
|
# Extract other keyword types
|
|
long_tail_keywords = keyword_analysis.get('long_tail', [])
|
|
# Handle both 'semantic' and 'semantic_keywords' field names
|
|
semantic_keywords = keyword_analysis.get('semantic', []) or keyword_analysis.get('semantic_keywords', [])
|
|
all_keywords = keyword_analysis.get('all_keywords', primary_keywords)
|
|
|
|
result = {
|
|
'primary': primary_keywords,
|
|
'long_tail': long_tail_keywords,
|
|
'semantic': semantic_keywords,
|
|
'all_keywords': all_keywords,
|
|
'search_intent': keyword_analysis.get('search_intent', 'informational')
|
|
}
|
|
|
|
logger.info(f"Extracted keywords: {result}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to extract keywords from research data: {e}")
|
|
logger.error(f"Research data structure: {research_data}")
|
|
# Fail fast - don't return empty keywords
|
|
raise ValueError(f"Keyword extraction failed: {e}")
|
|
|
|
async def _run_non_ai_analyzers(self, blog_content: str, keywords_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Run all non-AI analyzers in parallel for maximum performance"""
|
|
|
|
logger.info(f"Starting non-AI analyzers with content length: {len(blog_content)} chars")
|
|
logger.info(f"Keywords data: {keywords_data}")
|
|
|
|
# Parallel execution of fast analyzers
|
|
tasks = [
|
|
self._analyze_content_structure(blog_content),
|
|
self._analyze_keyword_usage(blog_content, keywords_data),
|
|
self._analyze_readability(blog_content),
|
|
self._analyze_content_quality(blog_content),
|
|
self._analyze_heading_structure(blog_content)
|
|
]
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Check for exceptions and fail fast
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception):
|
|
task_names = ['content_structure', 'keyword_analysis', 'readability_analysis', 'content_quality', 'heading_structure']
|
|
logger.error(f"Task {task_names[i]} failed: {result}")
|
|
raise result
|
|
|
|
# Log successful results
|
|
task_names = ['content_structure', 'keyword_analysis', 'readability_analysis', 'content_quality', 'heading_structure']
|
|
for i, (name, result) in enumerate(zip(task_names, results)):
|
|
logger.info(f"✅ {name} completed: {type(result).__name__} with {len(result) if isinstance(result, dict) else 'N/A'} fields")
|
|
|
|
return {
|
|
'content_structure': results[0],
|
|
'keyword_analysis': results[1],
|
|
'readability_analysis': results[2],
|
|
'content_quality': results[3],
|
|
'heading_structure': results[4]
|
|
}
|
|
|
|
async def _analyze_content_structure(self, content: str) -> Dict[str, Any]:
|
|
"""Analyze blog content structure"""
|
|
try:
|
|
# Parse markdown content
|
|
lines = content.split('\n')
|
|
|
|
# Count sections, paragraphs, sentences
|
|
sections = len([line for line in lines if line.startswith('##')])
|
|
paragraphs = len([line for line in lines if line.strip() and not line.startswith('#')])
|
|
sentences = len(re.findall(r'[.!?]+', content))
|
|
|
|
# Blog-specific structure analysis
|
|
has_introduction = any('introduction' in line.lower() or 'overview' in line.lower()
|
|
for line in lines[:10])
|
|
has_conclusion = any('conclusion' in line.lower() or 'summary' in line.lower()
|
|
for line in lines[-10:])
|
|
has_cta = any('call to action' in line.lower() or 'learn more' in line.lower()
|
|
for line in lines)
|
|
|
|
structure_score = self._calculate_structure_score(sections, paragraphs, has_introduction, has_conclusion)
|
|
|
|
return {
|
|
'total_sections': sections,
|
|
'total_paragraphs': paragraphs,
|
|
'total_sentences': sentences,
|
|
'has_introduction': has_introduction,
|
|
'has_conclusion': has_conclusion,
|
|
'has_call_to_action': has_cta,
|
|
'structure_score': structure_score,
|
|
'recommendations': self._get_structure_recommendations(sections, has_introduction, has_conclusion)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Content structure analysis failed: {e}")
|
|
raise e
|
|
|
|
async def _analyze_keyword_usage(self, content: str, keywords_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Analyze keyword usage and optimization"""
|
|
try:
|
|
# Extract keywords from research data
|
|
primary_keywords = keywords_data.get('primary', [])
|
|
long_tail_keywords = keywords_data.get('long_tail', [])
|
|
semantic_keywords = keywords_data.get('semantic', [])
|
|
|
|
# Use existing KeywordAnalyzer
|
|
keyword_result = self.keyword_analyzer.analyze(content, primary_keywords)
|
|
|
|
# Blog-specific keyword analysis
|
|
keyword_analysis = {
|
|
'primary_keywords': primary_keywords,
|
|
'long_tail_keywords': long_tail_keywords,
|
|
'semantic_keywords': semantic_keywords,
|
|
'keyword_density': {},
|
|
'keyword_distribution': {},
|
|
'missing_keywords': [],
|
|
'over_optimization': [],
|
|
'recommendations': []
|
|
}
|
|
|
|
# Analyze each keyword type
|
|
for keyword in primary_keywords:
|
|
density = self._calculate_keyword_density(content, keyword)
|
|
keyword_analysis['keyword_density'][keyword] = density
|
|
|
|
# Check if keyword appears in headings
|
|
in_headings = self._keyword_in_headings(content, keyword)
|
|
keyword_analysis['keyword_distribution'][keyword] = {
|
|
'density': density,
|
|
'in_headings': in_headings,
|
|
'first_occurrence': content.lower().find(keyword.lower())
|
|
}
|
|
|
|
# Check for missing important keywords
|
|
for keyword in primary_keywords:
|
|
if keyword.lower() not in content.lower():
|
|
keyword_analysis['missing_keywords'].append(keyword)
|
|
|
|
# Check for over-optimization
|
|
for keyword, density in keyword_analysis['keyword_density'].items():
|
|
if density > 3.0: # Over 3% density
|
|
keyword_analysis['over_optimization'].append(keyword)
|
|
|
|
return keyword_analysis
|
|
except Exception as e:
|
|
logger.error(f"Keyword analysis failed: {e}")
|
|
raise e
|
|
|
|
async def _analyze_readability(self, content: str) -> Dict[str, Any]:
|
|
"""Analyze content readability using textstat integration"""
|
|
try:
|
|
# Calculate readability metrics
|
|
readability_metrics = {
|
|
'flesch_reading_ease': textstat.flesch_reading_ease(content),
|
|
'flesch_kincaid_grade': textstat.flesch_kincaid_grade(content),
|
|
'gunning_fog': textstat.gunning_fog(content),
|
|
'smog_index': textstat.smog_index(content),
|
|
'automated_readability': textstat.automated_readability_index(content),
|
|
'coleman_liau': textstat.coleman_liau_index(content)
|
|
}
|
|
|
|
# Blog-specific readability analysis
|
|
avg_sentence_length = self._calculate_avg_sentence_length(content)
|
|
avg_paragraph_length = self._calculate_avg_paragraph_length(content)
|
|
|
|
readability_score = self._calculate_readability_score(readability_metrics)
|
|
|
|
return {
|
|
'metrics': readability_metrics,
|
|
'avg_sentence_length': avg_sentence_length,
|
|
'avg_paragraph_length': avg_paragraph_length,
|
|
'readability_score': readability_score,
|
|
'target_audience': self._determine_target_audience(readability_metrics),
|
|
'recommendations': self._get_readability_recommendations(readability_metrics, avg_sentence_length)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Readability analysis failed: {e}")
|
|
raise e
|
|
|
|
async def _analyze_content_quality(self, content: str) -> Dict[str, Any]:
|
|
"""Analyze overall content quality"""
|
|
try:
|
|
# Word count analysis
|
|
words = content.split()
|
|
word_count = len(words)
|
|
|
|
# Content depth analysis
|
|
unique_words = len(set(word.lower() for word in words))
|
|
vocabulary_diversity = unique_words / word_count if word_count > 0 else 0
|
|
|
|
# Content flow analysis
|
|
transition_words = ['however', 'therefore', 'furthermore', 'moreover', 'additionally', 'consequently']
|
|
transition_count = sum(content.lower().count(word) for word in transition_words)
|
|
|
|
content_depth_score = self._calculate_content_depth_score(word_count, vocabulary_diversity)
|
|
flow_score = self._calculate_flow_score(transition_count, word_count)
|
|
|
|
return {
|
|
'word_count': word_count,
|
|
'unique_words': unique_words,
|
|
'vocabulary_diversity': vocabulary_diversity,
|
|
'transition_words_used': transition_count,
|
|
'content_depth_score': content_depth_score,
|
|
'flow_score': flow_score,
|
|
'recommendations': self._get_content_quality_recommendations(word_count, vocabulary_diversity, transition_count)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Content quality analysis failed: {e}")
|
|
raise e
|
|
|
|
async def _analyze_heading_structure(self, content: str) -> Dict[str, Any]:
|
|
"""Analyze heading structure and hierarchy"""
|
|
try:
|
|
# Extract headings
|
|
h1_headings = re.findall(r'^# (.+)$', content, re.MULTILINE)
|
|
h2_headings = re.findall(r'^## (.+)$', content, re.MULTILINE)
|
|
h3_headings = re.findall(r'^### (.+)$', content, re.MULTILINE)
|
|
|
|
# Analyze heading structure
|
|
heading_hierarchy_score = self._calculate_heading_hierarchy_score(h1_headings, h2_headings, h3_headings)
|
|
|
|
return {
|
|
'h1_count': len(h1_headings),
|
|
'h2_count': len(h2_headings),
|
|
'h3_count': len(h3_headings),
|
|
'h1_headings': h1_headings,
|
|
'h2_headings': h2_headings,
|
|
'h3_headings': h3_headings,
|
|
'heading_hierarchy_score': heading_hierarchy_score,
|
|
'recommendations': self._get_heading_recommendations(h1_headings, h2_headings, h3_headings)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Heading structure analysis failed: {e}")
|
|
raise e
|
|
|
|
# Helper methods for calculations and scoring
|
|
def _calculate_structure_score(self, sections: int, paragraphs: int, has_intro: bool, has_conclusion: bool) -> int:
|
|
"""Calculate content structure score"""
|
|
score = 0
|
|
|
|
# Section count (optimal: 3-8 sections)
|
|
if 3 <= sections <= 8:
|
|
score += 30
|
|
elif sections < 3:
|
|
score += 15
|
|
else:
|
|
score += 20
|
|
|
|
# Paragraph count (optimal: 8-20 paragraphs)
|
|
if 8 <= paragraphs <= 20:
|
|
score += 30
|
|
elif paragraphs < 8:
|
|
score += 15
|
|
else:
|
|
score += 20
|
|
|
|
# Introduction and conclusion
|
|
if has_intro:
|
|
score += 20
|
|
if has_conclusion:
|
|
score += 20
|
|
|
|
return min(score, 100)
|
|
|
|
def _calculate_keyword_density(self, content: str, keyword: str) -> float:
|
|
"""Calculate keyword density percentage"""
|
|
content_lower = content.lower()
|
|
keyword_lower = keyword.lower()
|
|
|
|
word_count = len(content.split())
|
|
keyword_count = content_lower.count(keyword_lower)
|
|
|
|
return (keyword_count / word_count * 100) if word_count > 0 else 0
|
|
|
|
def _keyword_in_headings(self, content: str, keyword: str) -> bool:
|
|
"""Check if keyword appears in headings"""
|
|
headings = re.findall(r'^#+ (.+)$', content, re.MULTILINE)
|
|
return any(keyword.lower() in heading.lower() for heading in headings)
|
|
|
|
def _calculate_avg_sentence_length(self, content: str) -> float:
|
|
"""Calculate average sentence length"""
|
|
sentences = re.split(r'[.!?]+', content)
|
|
sentences = [s.strip() for s in sentences if s.strip()]
|
|
|
|
if not sentences:
|
|
return 0
|
|
|
|
total_words = sum(len(sentence.split()) for sentence in sentences)
|
|
return total_words / len(sentences)
|
|
|
|
def _calculate_avg_paragraph_length(self, content: str) -> float:
|
|
"""Calculate average paragraph length"""
|
|
paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]
|
|
|
|
if not paragraphs:
|
|
return 0
|
|
|
|
total_words = sum(len(paragraph.split()) for paragraph in paragraphs)
|
|
return total_words / len(paragraphs)
|
|
|
|
def _calculate_readability_score(self, metrics: Dict[str, float]) -> int:
|
|
"""Calculate overall readability score"""
|
|
# Flesch Reading Ease (0-100, higher is better)
|
|
flesch_score = metrics.get('flesch_reading_ease', 0)
|
|
|
|
# Convert to 0-100 scale
|
|
if flesch_score >= 80:
|
|
return 90
|
|
elif flesch_score >= 60:
|
|
return 80
|
|
elif flesch_score >= 40:
|
|
return 70
|
|
elif flesch_score >= 20:
|
|
return 60
|
|
else:
|
|
return 50
|
|
|
|
def _determine_target_audience(self, metrics: Dict[str, float]) -> str:
|
|
"""Determine target audience based on readability metrics"""
|
|
flesch_score = metrics.get('flesch_reading_ease', 0)
|
|
|
|
if flesch_score >= 80:
|
|
return "General audience (8th grade level)"
|
|
elif flesch_score >= 60:
|
|
return "High school level"
|
|
elif flesch_score >= 40:
|
|
return "College level"
|
|
else:
|
|
return "Graduate level"
|
|
|
|
def _calculate_content_depth_score(self, word_count: int, vocabulary_diversity: float) -> int:
|
|
"""Calculate content depth score"""
|
|
score = 0
|
|
|
|
# Word count (optimal: 800-2000 words)
|
|
if 800 <= word_count <= 2000:
|
|
score += 50
|
|
elif word_count < 800:
|
|
score += 30
|
|
else:
|
|
score += 40
|
|
|
|
# Vocabulary diversity (optimal: 0.4-0.7)
|
|
if 0.4 <= vocabulary_diversity <= 0.7:
|
|
score += 50
|
|
elif vocabulary_diversity < 0.4:
|
|
score += 30
|
|
else:
|
|
score += 40
|
|
|
|
return min(score, 100)
|
|
|
|
def _calculate_flow_score(self, transition_count: int, word_count: int) -> int:
|
|
"""Calculate content flow score"""
|
|
if word_count == 0:
|
|
return 0
|
|
|
|
transition_density = transition_count / (word_count / 100)
|
|
|
|
# Optimal transition density: 1-3 per 100 words
|
|
if 1 <= transition_density <= 3:
|
|
return 90
|
|
elif transition_density < 1:
|
|
return 60
|
|
else:
|
|
return 70
|
|
|
|
def _calculate_heading_hierarchy_score(self, h1: List[str], h2: List[str], h3: List[str]) -> int:
|
|
"""Calculate heading hierarchy score"""
|
|
score = 0
|
|
|
|
# Should have exactly 1 H1
|
|
if len(h1) == 1:
|
|
score += 40
|
|
elif len(h1) == 0:
|
|
score += 20
|
|
else:
|
|
score += 10
|
|
|
|
# Should have 3-8 H2 headings
|
|
if 3 <= len(h2) <= 8:
|
|
score += 40
|
|
elif len(h2) < 3:
|
|
score += 20
|
|
else:
|
|
score += 30
|
|
|
|
# H3 headings are optional but good for structure
|
|
if len(h3) > 0:
|
|
score += 20
|
|
|
|
return min(score, 100)
|
|
|
|
def _calculate_keyword_score(self, keyword_analysis: Dict[str, Any]) -> int:
|
|
"""Calculate keyword optimization score"""
|
|
score = 0
|
|
|
|
# Check keyword density (optimal: 1-3%)
|
|
densities = keyword_analysis.get('keyword_density', {})
|
|
for keyword, density in densities.items():
|
|
if 1 <= density <= 3:
|
|
score += 30
|
|
elif density < 1:
|
|
score += 15
|
|
else:
|
|
score += 10
|
|
|
|
# Check keyword distribution
|
|
distributions = keyword_analysis.get('keyword_distribution', {})
|
|
for keyword, dist in distributions.items():
|
|
if dist.get('in_headings', False):
|
|
score += 20
|
|
if dist.get('first_occurrence', -1) < 100: # Early occurrence
|
|
score += 20
|
|
|
|
# Penalize missing keywords
|
|
missing = len(keyword_analysis.get('missing_keywords', []))
|
|
score -= missing * 10
|
|
|
|
# Penalize over-optimization
|
|
over_opt = len(keyword_analysis.get('over_optimization', []))
|
|
score -= over_opt * 15
|
|
|
|
return max(0, min(score, 100))
|
|
|
|
def _calculate_weighted_score(self, scores: Dict[str, int]) -> int:
|
|
"""Calculate weighted overall score"""
|
|
weights = {
|
|
'structure': 0.2,
|
|
'keywords': 0.25,
|
|
'readability': 0.2,
|
|
'quality': 0.15,
|
|
'headings': 0.1,
|
|
'ai_insights': 0.1
|
|
}
|
|
|
|
weighted_sum = sum(scores.get(key, 0) * weight for key, weight in weights.items())
|
|
return int(weighted_sum)
|
|
|
|
# Recommendation methods
|
|
def _get_structure_recommendations(self, sections: int, has_intro: bool, has_conclusion: bool) -> List[str]:
|
|
"""Get structure recommendations"""
|
|
recommendations = []
|
|
|
|
if sections < 3:
|
|
recommendations.append("Add more sections to improve content structure")
|
|
elif sections > 8:
|
|
recommendations.append("Consider combining some sections for better flow")
|
|
|
|
if not has_intro:
|
|
recommendations.append("Add an introduction section to set context")
|
|
|
|
if not has_conclusion:
|
|
recommendations.append("Add a conclusion section to summarize key points")
|
|
|
|
return recommendations
|
|
|
|
def _get_readability_recommendations(self, metrics: Dict[str, float], avg_sentence_length: float) -> List[str]:
|
|
"""Get readability recommendations"""
|
|
recommendations = []
|
|
|
|
flesch_score = metrics.get('flesch_reading_ease', 0)
|
|
|
|
if flesch_score < 60:
|
|
recommendations.append("Simplify language and use shorter sentences")
|
|
|
|
if avg_sentence_length > 20:
|
|
recommendations.append("Break down long sentences for better readability")
|
|
|
|
if flesch_score > 80:
|
|
recommendations.append("Consider adding more technical depth for expert audience")
|
|
|
|
return recommendations
|
|
|
|
def _get_content_quality_recommendations(self, word_count: int, vocabulary_diversity: float, transition_count: int) -> List[str]:
|
|
"""Get content quality recommendations"""
|
|
recommendations = []
|
|
|
|
if word_count < 800:
|
|
recommendations.append("Expand content with more detailed explanations")
|
|
elif word_count > 2000:
|
|
recommendations.append("Consider breaking into multiple posts")
|
|
|
|
if vocabulary_diversity < 0.4:
|
|
recommendations.append("Use more varied vocabulary to improve engagement")
|
|
|
|
if transition_count < 3:
|
|
recommendations.append("Add more transition words to improve flow")
|
|
|
|
return recommendations
|
|
|
|
def _get_heading_recommendations(self, h1: List[str], h2: List[str], h3: List[str]) -> List[str]:
|
|
"""Get heading recommendations"""
|
|
recommendations = []
|
|
|
|
if len(h1) == 0:
|
|
recommendations.append("Add a main H1 heading")
|
|
elif len(h1) > 1:
|
|
recommendations.append("Use only one H1 heading per post")
|
|
|
|
if len(h2) < 3:
|
|
recommendations.append("Add more H2 headings to structure content")
|
|
elif len(h2) > 8:
|
|
recommendations.append("Consider using H3 headings for better hierarchy")
|
|
|
|
return recommendations
|
|
|
|
async def _run_ai_analysis(self, blog_content: str, keywords_data: Dict[str, Any], non_ai_results: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Run single AI analysis for structured insights (provider-agnostic)"""
|
|
try:
|
|
# Prepare context for AI analysis
|
|
context = {
|
|
'blog_content': blog_content,
|
|
'keywords_data': keywords_data,
|
|
'non_ai_results': non_ai_results
|
|
}
|
|
|
|
# Create AI prompt for structured analysis
|
|
prompt = self._create_ai_analysis_prompt(context)
|
|
|
|
schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"content_quality_insights": {
|
|
"type": "object",
|
|
"properties": {
|
|
"engagement_score": {"type": "number"},
|
|
"value_proposition": {"type": "string"},
|
|
"content_gaps": {"type": "array", "items": {"type": "string"}},
|
|
"improvement_suggestions": {"type": "array", "items": {"type": "string"}}
|
|
}
|
|
},
|
|
"seo_optimization_insights": {
|
|
"type": "object",
|
|
"properties": {
|
|
"keyword_optimization": {"type": "string"},
|
|
"content_relevance": {"type": "string"},
|
|
"search_intent_alignment": {"type": "string"},
|
|
"seo_improvements": {"type": "array", "items": {"type": "string"}}
|
|
}
|
|
},
|
|
"user_experience_insights": {
|
|
"type": "object",
|
|
"properties": {
|
|
"content_flow": {"type": "string"},
|
|
"readability_assessment": {"type": "string"},
|
|
"engagement_factors": {"type": "array", "items": {"type": "string"}},
|
|
"ux_improvements": {"type": "array", "items": {"type": "string"}}
|
|
}
|
|
},
|
|
"competitive_analysis": {
|
|
"type": "object",
|
|
"properties": {
|
|
"content_differentiation": {"type": "string"},
|
|
"unique_value": {"type": "string"},
|
|
"competitive_advantages": {"type": "array", "items": {"type": "string"}},
|
|
"market_positioning": {"type": "string"}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
# Provider-agnostic structured response respecting GPT_PROVIDER
|
|
ai_response = llm_text_gen(
|
|
prompt=prompt,
|
|
json_struct=schema,
|
|
system_prompt=None
|
|
)
|
|
|
|
return ai_response
|
|
|
|
except Exception as e:
|
|
logger.error(f"AI analysis failed: {e}")
|
|
raise e
|
|
|
|
def _create_ai_analysis_prompt(self, context: Dict[str, Any]) -> str:
|
|
"""Create AI analysis prompt"""
|
|
blog_content = context['blog_content']
|
|
keywords_data = context['keywords_data']
|
|
non_ai_results = context['non_ai_results']
|
|
|
|
prompt = f"""
|
|
Analyze this blog content for SEO optimization and user experience. Provide structured insights based on the content and keyword data.
|
|
|
|
BLOG CONTENT:
|
|
{blog_content[:2000]}...
|
|
|
|
KEYWORDS DATA:
|
|
Primary Keywords: {keywords_data.get('primary', [])}
|
|
Long-tail Keywords: {keywords_data.get('long_tail', [])}
|
|
Semantic Keywords: {keywords_data.get('semantic', [])}
|
|
Search Intent: {keywords_data.get('search_intent', 'informational')}
|
|
|
|
NON-AI ANALYSIS RESULTS:
|
|
Structure Score: {non_ai_results.get('content_structure', {}).get('structure_score', 0)}
|
|
Readability Score: {non_ai_results.get('readability_analysis', {}).get('readability_score', 0)}
|
|
Content Quality Score: {non_ai_results.get('content_quality', {}).get('content_depth_score', 0)}
|
|
|
|
Please provide:
|
|
1. Content Quality Insights: Assess engagement potential, value proposition, content gaps, and improvement suggestions
|
|
2. SEO Optimization Insights: Evaluate keyword optimization, content relevance, search intent alignment, and SEO improvements
|
|
3. User Experience Insights: Analyze content flow, readability, engagement factors, and UX improvements
|
|
4. Competitive Analysis: Identify content differentiation, unique value, competitive advantages, and market positioning
|
|
|
|
Focus on actionable insights that can improve the blog's performance and user engagement.
|
|
"""
|
|
|
|
return prompt
|
|
|
|
def _compile_blog_seo_results(self, non_ai_results: Dict[str, Any], ai_insights: Dict[str, Any], keywords_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Compile comprehensive SEO analysis results"""
|
|
try:
|
|
# Validate required data - fail fast if missing
|
|
if not non_ai_results:
|
|
raise ValueError("Non-AI analysis results are missing")
|
|
|
|
if not ai_insights:
|
|
raise ValueError("AI insights are missing")
|
|
|
|
# Calculate category scores
|
|
category_scores = {
|
|
'structure': non_ai_results.get('content_structure', {}).get('structure_score', 0),
|
|
'keywords': self._calculate_keyword_score(non_ai_results.get('keyword_analysis', {})),
|
|
'readability': non_ai_results.get('readability_analysis', {}).get('readability_score', 0),
|
|
'quality': non_ai_results.get('content_quality', {}).get('content_depth_score', 0),
|
|
'headings': non_ai_results.get('heading_structure', {}).get('heading_hierarchy_score', 0),
|
|
'ai_insights': ai_insights.get('content_quality_insights', {}).get('engagement_score', 0)
|
|
}
|
|
|
|
# Calculate overall score
|
|
overall_score = self._calculate_weighted_score(category_scores)
|
|
|
|
# Compile actionable recommendations
|
|
actionable_recommendations = self._compile_actionable_recommendations(non_ai_results, ai_insights)
|
|
|
|
# Create visualization data
|
|
visualization_data = self._create_visualization_data(category_scores, non_ai_results)
|
|
|
|
return {
|
|
'overall_score': overall_score,
|
|
'category_scores': category_scores,
|
|
'detailed_analysis': non_ai_results,
|
|
'ai_insights': ai_insights,
|
|
'keywords_data': keywords_data,
|
|
'visualization_data': visualization_data,
|
|
'actionable_recommendations': actionable_recommendations,
|
|
'generated_at': datetime.utcnow().isoformat(),
|
|
'analysis_summary': self._create_analysis_summary(overall_score, category_scores, ai_insights)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Results compilation failed: {e}")
|
|
# Fail fast - don't return fallback data
|
|
raise e
|
|
|
|
def _compile_actionable_recommendations(self, non_ai_results: Dict[str, Any], ai_insights: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""Compile actionable recommendations from all sources"""
|
|
recommendations = []
|
|
|
|
# Structure recommendations
|
|
structure_recs = non_ai_results.get('content_structure', {}).get('recommendations', [])
|
|
for rec in structure_recs:
|
|
recommendations.append({
|
|
'category': 'Structure',
|
|
'priority': 'High',
|
|
'recommendation': rec,
|
|
'impact': 'Improves content organization and user experience'
|
|
})
|
|
|
|
# Keyword recommendations
|
|
keyword_recs = non_ai_results.get('keyword_analysis', {}).get('recommendations', [])
|
|
for rec in keyword_recs:
|
|
recommendations.append({
|
|
'category': 'Keywords',
|
|
'priority': 'High',
|
|
'recommendation': rec,
|
|
'impact': 'Improves search engine visibility'
|
|
})
|
|
|
|
# Readability recommendations
|
|
readability_recs = non_ai_results.get('readability_analysis', {}).get('recommendations', [])
|
|
for rec in readability_recs:
|
|
recommendations.append({
|
|
'category': 'Readability',
|
|
'priority': 'Medium',
|
|
'recommendation': rec,
|
|
'impact': 'Improves user engagement and comprehension'
|
|
})
|
|
|
|
# AI insights recommendations
|
|
ai_recs = ai_insights.get('content_quality_insights', {}).get('improvement_suggestions', [])
|
|
for rec in ai_recs:
|
|
recommendations.append({
|
|
'category': 'Content Quality',
|
|
'priority': 'Medium',
|
|
'recommendation': rec,
|
|
'impact': 'Enhances content value and engagement'
|
|
})
|
|
|
|
return recommendations
|
|
|
|
def _create_visualization_data(self, category_scores: Dict[str, int], non_ai_results: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Create data for visualization components"""
|
|
return {
|
|
'score_radar': {
|
|
'categories': list(category_scores.keys()),
|
|
'scores': list(category_scores.values()),
|
|
'max_score': 100
|
|
},
|
|
'keyword_analysis': {
|
|
'densities': non_ai_results.get('keyword_analysis', {}).get('keyword_density', {}),
|
|
'missing_keywords': non_ai_results.get('keyword_analysis', {}).get('missing_keywords', []),
|
|
'over_optimization': non_ai_results.get('keyword_analysis', {}).get('over_optimization', [])
|
|
},
|
|
'readability_metrics': non_ai_results.get('readability_analysis', {}).get('metrics', {}),
|
|
'content_stats': {
|
|
'word_count': non_ai_results.get('content_quality', {}).get('word_count', 0),
|
|
'sections': non_ai_results.get('content_structure', {}).get('total_sections', 0),
|
|
'paragraphs': non_ai_results.get('content_structure', {}).get('total_paragraphs', 0)
|
|
}
|
|
}
|
|
|
|
def _create_analysis_summary(self, overall_score: int, category_scores: Dict[str, int], ai_insights: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Create analysis summary"""
|
|
# Determine overall grade
|
|
if overall_score >= 90:
|
|
grade = 'A'
|
|
status = 'Excellent'
|
|
elif overall_score >= 80:
|
|
grade = 'B'
|
|
status = 'Good'
|
|
elif overall_score >= 70:
|
|
grade = 'C'
|
|
status = 'Fair'
|
|
elif overall_score >= 60:
|
|
grade = 'D'
|
|
status = 'Needs Improvement'
|
|
else:
|
|
grade = 'F'
|
|
status = 'Poor'
|
|
|
|
# Find strongest and weakest categories
|
|
strongest_category = max(category_scores.items(), key=lambda x: x[1])
|
|
weakest_category = min(category_scores.items(), key=lambda x: x[1])
|
|
|
|
return {
|
|
'overall_grade': grade,
|
|
'status': status,
|
|
'strongest_category': strongest_category[0],
|
|
'weakest_category': weakest_category[0],
|
|
'key_strengths': self._identify_key_strengths(category_scores),
|
|
'key_weaknesses': self._identify_key_weaknesses(category_scores),
|
|
'ai_summary': ai_insights.get('content_quality_insights', {}).get('value_proposition', '')
|
|
}
|
|
|
|
def _identify_key_strengths(self, category_scores: Dict[str, int]) -> List[str]:
|
|
"""Identify key strengths"""
|
|
strengths = []
|
|
|
|
for category, score in category_scores.items():
|
|
if score >= 80:
|
|
strengths.append(f"Strong {category} optimization")
|
|
|
|
return strengths
|
|
|
|
def _identify_key_weaknesses(self, category_scores: Dict[str, int]) -> List[str]:
|
|
"""Identify key weaknesses"""
|
|
weaknesses = []
|
|
|
|
for category, score in category_scores.items():
|
|
if score < 60:
|
|
weaknesses.append(f"Needs improvement in {category}")
|
|
|
|
return weaknesses
|
|
|
|
def _create_error_result(self, error_message: str) -> Dict[str, Any]:
|
|
"""Create error result - this should not be used in fail-fast mode"""
|
|
raise ValueError(f"Error result creation not allowed in fail-fast mode: {error_message}")
|