Updated SEO Analysis Modal

This commit is contained in:
ajaysi
2025-09-22 21:02:32 +05:30
parent f98d49cea7
commit 12119d418b
38 changed files with 5742 additions and 2337 deletions

View File

@@ -179,6 +179,28 @@ async def get_section_continuity(section_id: str) -> Dict[str, Any]:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/flow-analysis/basic")
async def analyze_flow_basic(request: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze flow metrics for entire blog using single AI call (cost-effective)."""
try:
result = await service.analyze_flow_basic(request)
return result
except Exception as e:
logger.error(f"Failed to perform basic flow analysis: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/flow-analysis/advanced")
async def analyze_flow_advanced(request: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze flow metrics for each section individually (detailed but expensive)."""
try:
result = await service.analyze_flow_advanced(request)
return result
except Exception as e:
logger.error(f"Failed to perform advanced flow analysis: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/section/optimize", response_model=BlogOptimizeResponse)
async def optimize_section(request: BlogOptimizeRequest) -> BlogOptimizeResponse:
"""Optimize a specific section for better quality and engagement."""
@@ -326,4 +348,28 @@ async def medium_generation_status(task_id: str):
raise
except Exception as e:
logger.error(f"Failed to get medium generation status for {task_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/rewrite/start")
async def start_blog_rewrite(request: Dict[str, Any]) -> Dict[str, Any]:
"""Start blog rewrite task with user feedback."""
try:
task_id = service.start_blog_rewrite(request)
return {"task_id": task_id, "status": "started"}
except Exception as e:
logger.error(f"Failed to start blog rewrite: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/rewrite/status/{task_id}")
async def rewrite_status(task_id: str):
"""Poll status for blog rewrite task."""
try:
status = service.task_manager.get_task_status(task_id)
if status is None:
raise HTTPException(status_code=404, detail="Task not found")
return status
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get rewrite status for {task_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -0,0 +1,275 @@
"""
Blog Writer SEO Analysis API Endpoint
Provides API endpoint for analyzing blog content SEO with parallel processing
and CopilotKit integration for real-time progress updates.
"""
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Dict, Any, Optional
from loguru import logger
from datetime import datetime
from services.blog_writer.seo.blog_content_seo_analyzer import BlogContentSEOAnalyzer
from services.blog_writer.core.blog_writer_service import BlogWriterService
router = APIRouter(prefix="/api/blog-writer/seo", tags=["Blog SEO Analysis"])
class SEOAnalysisRequest(BaseModel):
"""Request model for SEO analysis"""
blog_content: str
research_data: Dict[str, Any]
user_id: Optional[str] = None
session_id: Optional[str] = None
class SEOAnalysisResponse(BaseModel):
"""Response model for SEO analysis"""
success: bool
analysis_id: str
overall_score: float
category_scores: Dict[str, float]
analysis_summary: Dict[str, Any]
actionable_recommendations: list
generated_at: str
error: Optional[str] = None
class SEOAnalysisProgress(BaseModel):
"""Progress update model for real-time updates"""
analysis_id: str
stage: str
progress: int
message: str
timestamp: str
# Initialize analyzer
seo_analyzer = BlogContentSEOAnalyzer()
blog_writer_service = BlogWriterService()
@router.post("/analyze", response_model=SEOAnalysisResponse)
async def analyze_blog_seo(request: SEOAnalysisRequest):
"""
Analyze blog content for SEO optimization
This endpoint performs comprehensive SEO analysis including:
- Content structure analysis
- Keyword optimization analysis
- Readability assessment
- Content quality evaluation
- AI-powered insights generation
Args:
request: SEOAnalysisRequest containing blog content and research data
Returns:
SEOAnalysisResponse with comprehensive analysis results
"""
try:
logger.info(f"Starting SEO analysis for blog content")
# Validate request
if not request.blog_content or not request.blog_content.strip():
raise HTTPException(status_code=400, detail="Blog content is required")
if not request.research_data:
raise HTTPException(status_code=400, detail="Research data is required")
# Generate analysis ID
import uuid
analysis_id = str(uuid.uuid4())
# Perform SEO analysis
analysis_results = await seo_analyzer.analyze_blog_content(
blog_content=request.blog_content,
research_data=request.research_data
)
# Check for errors
if 'error' in analysis_results:
logger.error(f"SEO analysis failed: {analysis_results['error']}")
return SEOAnalysisResponse(
success=False,
analysis_id=analysis_id,
overall_score=0,
category_scores={},
analysis_summary={},
actionable_recommendations=[],
generated_at=analysis_results.get('generated_at', ''),
error=analysis_results['error']
)
# Return successful response
return SEOAnalysisResponse(
success=True,
analysis_id=analysis_id,
overall_score=analysis_results.get('overall_score', 0),
category_scores=analysis_results.get('category_scores', {}),
analysis_summary=analysis_results.get('analysis_summary', {}),
actionable_recommendations=analysis_results.get('actionable_recommendations', []),
generated_at=analysis_results.get('generated_at', '')
)
except HTTPException:
raise
except Exception as e:
logger.error(f"SEO analysis endpoint error: {e}")
raise HTTPException(status_code=500, detail=f"SEO analysis failed: {str(e)}")
@router.post("/analyze-with-progress")
async def analyze_blog_seo_with_progress(request: SEOAnalysisRequest):
"""
Analyze blog content for SEO with real-time progress updates
This endpoint provides real-time progress updates for CopilotKit integration.
It returns a stream of progress updates and final results.
Args:
request: SEOAnalysisRequest containing blog content and research data
Returns:
Generator yielding progress updates and final results
"""
try:
logger.info(f"Starting SEO analysis with progress for blog content")
# Validate request
if not request.blog_content or not request.blog_content.strip():
raise HTTPException(status_code=400, detail="Blog content is required")
if not request.research_data:
raise HTTPException(status_code=400, detail="Research data is required")
# Generate analysis ID
import uuid
analysis_id = str(uuid.uuid4())
# Yield progress updates
async def progress_generator():
try:
# Stage 1: Initialization
yield SEOAnalysisProgress(
analysis_id=analysis_id,
stage="initialization",
progress=10,
message="Initializing SEO analysis...",
timestamp=datetime.utcnow().isoformat()
)
# Stage 2: Keyword extraction
yield SEOAnalysisProgress(
analysis_id=analysis_id,
stage="keyword_extraction",
progress=20,
message="Extracting keywords from research data...",
timestamp=datetime.utcnow().isoformat()
)
# Stage 3: Non-AI analysis
yield SEOAnalysisProgress(
analysis_id=analysis_id,
stage="non_ai_analysis",
progress=40,
message="Running content structure and readability analysis...",
timestamp=datetime.utcnow().isoformat()
)
# Stage 4: AI analysis
yield SEOAnalysisProgress(
analysis_id=analysis_id,
stage="ai_analysis",
progress=70,
message="Generating AI-powered insights...",
timestamp=datetime.utcnow().isoformat()
)
# Stage 5: Results compilation
yield SEOAnalysisProgress(
analysis_id=analysis_id,
stage="compilation",
progress=90,
message="Compiling analysis results...",
timestamp=datetime.utcnow().isoformat()
)
# Perform actual analysis
analysis_results = await seo_analyzer.analyze_blog_content(
blog_content=request.blog_content,
research_data=request.research_data
)
# Final result
yield SEOAnalysisProgress(
analysis_id=analysis_id,
stage="completed",
progress=100,
message="SEO analysis completed successfully!",
timestamp=datetime.utcnow().isoformat()
)
# Yield final results (can't return in async generator)
yield analysis_results
except Exception as e:
logger.error(f"Progress generator error: {e}")
yield SEOAnalysisProgress(
analysis_id=analysis_id,
stage="error",
progress=0,
message=f"Analysis failed: {str(e)}",
timestamp=datetime.utcnow().isoformat()
)
raise
return progress_generator()
except HTTPException:
raise
except Exception as e:
logger.error(f"SEO analysis with progress endpoint error: {e}")
raise HTTPException(status_code=500, detail=f"SEO analysis failed: {str(e)}")
@router.get("/analysis/{analysis_id}")
async def get_analysis_result(analysis_id: str):
"""
Get SEO analysis result by ID
Args:
analysis_id: Unique identifier for the analysis
Returns:
SEO analysis results
"""
try:
# In a real implementation, you would store results in a database
# For now, we'll return a placeholder
logger.info(f"Retrieving SEO analysis result for ID: {analysis_id}")
return {
"analysis_id": analysis_id,
"status": "completed",
"message": "Analysis results retrieved successfully"
}
except Exception as e:
logger.error(f"Get analysis result error: {e}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve analysis result: {str(e)}")
@router.get("/health")
async def health_check():
"""Health check endpoint for SEO analysis service"""
return {
"status": "healthy",
"service": "blog-seo-analysis",
"timestamp": datetime.utcnow().isoformat()
}

View File

@@ -466,6 +466,13 @@ try:
except Exception as e:
logger.warning(f"AI Blog Writer router not mounted: {e}")
# Include Blog Writer SEO Analysis router (comprehensive SEO analysis)
try:
from api.blog_writer.seo_analysis import router as blog_seo_analysis_router
app.include_router(blog_seo_analysis_router)
except Exception as e:
logger.warning(f"Blog Writer SEO Analysis router not mounted: {e}")
# Include persona router
from api.persona_routes import router as persona_router
app.include_router(persona_router)

View File

@@ -163,6 +163,7 @@ class BlogOptimizeResponse(BaseModel):
class BlogSEOAnalyzeRequest(BaseModel):
content: str
keywords: List[str] = []
research_data: Optional[Dict[str, Any]] = None
class BlogSEOAnalyzeResponse(BaseModel):

View File

@@ -0,0 +1,209 @@
"""
Blog Rewriter Service
Handles blog rewriting based on user feedback using structured AI calls.
"""
import time
import uuid
from typing import Dict, Any
from loguru import logger
from services.llm_providers.gemini_provider import gemini_structured_json_response
class BlogRewriter:
"""Service for rewriting blog content based on user feedback."""
def __init__(self, task_manager):
self.task_manager = task_manager
def start_blog_rewrite(self, request: Dict[str, Any]) -> str:
"""Start blog rewrite task with user feedback."""
try:
# Extract request data
title = request.get("title", "Untitled Blog")
sections = request.get("sections", [])
research = request.get("research", {})
outline = request.get("outline", [])
feedback = request.get("feedback", "")
tone = request.get("tone")
audience = request.get("audience")
focus = request.get("focus")
if not sections:
raise ValueError("No sections provided for rewrite")
if not feedback or len(feedback.strip()) < 10:
raise ValueError("Feedback is required and must be at least 10 characters")
# Create task for rewrite
task_id = f"rewrite_{int(time.time())}_{uuid.uuid4().hex[:8]}"
# Start the rewrite task
self.task_manager.start_task(
task_id,
self._execute_blog_rewrite,
title=title,
sections=sections,
research=research,
outline=outline,
feedback=feedback,
tone=tone,
audience=audience,
focus=focus
)
logger.info(f"Blog rewrite task started: {task_id}")
return task_id
except Exception as e:
logger.error(f"Failed to start blog rewrite: {e}")
raise
async def _execute_blog_rewrite(self, task_id: str, **kwargs):
"""Execute the blog rewrite task."""
try:
title = kwargs.get("title", "Untitled Blog")
sections = kwargs.get("sections", [])
research = kwargs.get("research", {})
outline = kwargs.get("outline", [])
feedback = kwargs.get("feedback", "")
tone = kwargs.get("tone")
audience = kwargs.get("audience")
focus = kwargs.get("focus")
# Update task status
self.task_manager.update_task_status(task_id, "processing", "Analyzing current content and feedback...")
# Build rewrite prompt with user feedback
system_prompt = f"""You are an expert blog writer tasked with rewriting content based on user feedback.
Current Blog Title: {title}
User Feedback: {feedback}
{f"Desired Tone: {tone}" if tone else ""}
{f"Target Audience: {audience}" if audience else ""}
{f"Focus Area: {focus}" if focus else ""}
Your task is to rewrite the blog content to address the user's feedback while maintaining the core structure and research insights."""
# Prepare content for rewrite
full_content = f"Title: {title}\n\n"
for section in sections:
full_content += f"Section: {section.get('heading', 'Untitled')}\n"
full_content += f"Content: {section.get('content', '')}\n\n"
# Create rewrite prompt
rewrite_prompt = f"""
Based on the user feedback and current blog content, rewrite the blog to address their concerns and preferences.
Current Content:
{full_content}
User Feedback: {feedback}
{f"Desired Tone: {tone}" if tone else ""}
{f"Target Audience: {audience}" if audience else ""}
{f"Focus Area: {focus}" if focus else ""}
Please rewrite the blog content in the following JSON format:
{{
"title": "New or improved blog title",
"sections": [
{{
"id": "section_id",
"heading": "Section heading",
"content": "Rewritten section content"
}}
]
}}
Guidelines:
1. Address the user's feedback directly
2. Maintain the research insights and factual accuracy
3. Improve flow, clarity, and engagement
4. Keep the same section structure unless feedback suggests otherwise
5. Ensure content is well-formatted with proper paragraphs
"""
# Update task status
self.task_manager.update_task_status(task_id, "processing", "Generating rewritten content...")
# Use structured JSON generation
schema = {
"type": "object",
"properties": {
"title": {"type": "string"},
"sections": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"heading": {"type": "string"},
"content": {"type": "string"}
}
}
}
}
}
result = gemini_structured_json_response(
prompt=rewrite_prompt,
schema=schema,
temperature=0.7,
max_tokens=4096,
system_prompt=system_prompt
)
logger.info(f"Gemini response for rewrite task {task_id}: {result}")
# Check if we have a valid result - handle both multi-section and single-section formats
is_valid_multi_section = result and not result.get("error") and result.get("title") and result.get("sections")
is_valid_single_section = result and not result.get("error") and (result.get("heading") or result.get("title")) and result.get("content")
if is_valid_multi_section or is_valid_single_section:
# If single section format, convert to multi-section format for consistency
if is_valid_single_section and not is_valid_multi_section:
# Convert single section to multi-section format
converted_result = {
"title": result.get("heading") or result.get("title") or "Rewritten Blog",
"sections": [
{
"id": result.get("id") or "section_1",
"heading": result.get("heading") or "Main Content",
"content": result.get("content", "")
}
]
}
result = converted_result
logger.info(f"Converted single section response to multi-section format for task {task_id}")
# Update task status with success
self.task_manager.update_task_status(
task_id,
"completed",
"Blog rewrite completed successfully!",
result=result
)
logger.info(f"Blog rewrite completed successfully: {task_id}")
else:
# More detailed error handling
if not result:
error_msg = "No response from AI"
elif result.get("error"):
error_msg = f"AI error: {result.get('error')}"
elif not (result.get("title") or result.get("heading")):
error_msg = "AI response missing title/heading"
elif not (result.get("sections") or result.get("content")):
error_msg = "AI response missing sections/content"
else:
error_msg = "AI response has invalid structure"
self.task_manager.update_task_status(task_id, "failed", f"Rewrite failed: {error_msg}")
logger.error(f"Blog rewrite failed: {error_msg}")
except Exception as e:
error_msg = f"Blog rewrite error: {str(e)}"
self.task_manager.update_task_status(task_id, "failed", error_msg)
logger.error(f"Blog rewrite task failed: {e}")
raise

View File

@@ -0,0 +1,237 @@
"""
Medium Blog Generator Service
Handles generation of medium-length blogs (≤1000 words) using structured AI calls.
"""
import time
import json
from typing import Dict, Any, List
from loguru import logger
from models.blog_models import (
MediumBlogGenerateRequest,
MediumBlogGenerateResult,
MediumGeneratedSection,
ResearchSource,
)
from services.llm_providers.gemini_provider import gemini_structured_json_response
from services.cache.persistent_content_cache import persistent_content_cache
class MediumBlogGenerator:
"""Service for generating medium-length blog content using structured AI calls."""
def __init__(self):
self.cache = persistent_content_cache
async def generate_medium_blog_with_progress(self, req: MediumBlogGenerateRequest, task_id: str) -> MediumBlogGenerateResult:
"""Use Gemini structured JSON to generate a medium-length blog in one call."""
import time
start = time.time()
# Prepare sections data for cache key generation
sections_for_cache = []
for s in req.sections:
sections_for_cache.append({
"id": s.id,
"heading": s.heading,
"keyPoints": getattr(s, "key_points", []) or getattr(s, "keyPoints", []),
"subheadings": getattr(s, "subheadings", []),
"keywords": getattr(s, "keywords", []),
"targetWords": getattr(s, "target_words", None) or getattr(s, "targetWords", None),
})
# Check cache first
cached_result = self.cache.get_cached_content(
keywords=req.researchKeywords or [],
sections=sections_for_cache,
global_target_words=req.globalTargetWords or 1000,
persona_data=req.persona.dict() if req.persona else None,
tone=req.tone,
audience=req.audience
)
if cached_result:
logger.info(f"Using cached content for keywords: {req.researchKeywords} (saved expensive generation)")
# Add cache hit marker to distinguish from fresh generation
cached_result['generation_time_ms'] = 0 # Mark as cache hit
cached_result['cache_hit'] = True
return MediumBlogGenerateResult(**cached_result)
# Cache miss - proceed with AI generation
logger.info(f"Cache miss - generating new content for keywords: {req.researchKeywords}")
# Build schema expected from the model
schema = {
"type": "object",
"properties": {
"title": {"type": "string"},
"sections": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"heading": {"type": "string"},
"content": {"type": "string"},
"wordCount": {"type": "number"},
"sources": {
"type": "array",
"items": {
"type": "object",
"properties": {"title": {"type": "string"}, "url": {"type": "string"}},
},
},
},
},
},
},
}
# Compose prompt
def section_block(s):
return {
"id": s.id,
"heading": s.heading,
"outline": {
"keyPoints": getattr(s, "key_points", []) or getattr(s, "keyPoints", []),
"subheadings": getattr(s, "subheadings", []),
"keywords": getattr(s, "keywords", []),
"targetWords": getattr(s, "target_words", None) or getattr(s, "targetWords", None),
"references": [
{"title": r.title, "url": r.url} for r in getattr(s, "references", [])
],
},
}
payload = {
"title": req.title,
"globalTargetWords": req.globalTargetWords or 1000,
"persona": req.persona.dict() if req.persona else None,
"tone": req.tone,
"audience": req.audience,
"sections": [section_block(s) for s in req.sections],
}
# Build persona-aware system prompt
persona_context = ""
if req.persona:
persona_context = f"""
PERSONA GUIDELINES:
- Industry: {req.persona.industry or 'General'}
- Tone: {req.persona.tone or 'Professional'}
- Audience: {req.persona.audience or 'General readers'}
- Persona ID: {req.persona.persona_id or 'Default'}
Write content that reflects this persona's expertise and communication style.
Use industry-specific terminology and examples where appropriate.
Maintain consistent voice and authority throughout all sections.
"""
system = (
"You are a professional blog writer with deep expertise in your field. "
"Generate high-quality, persona-driven content for each section based on the provided outline. "
"Write engaging, informative content that follows the section's key points and target word count. "
"Ensure the content flows naturally and maintains consistent voice and authority. "
"Format content with proper paragraph breaks using double line breaks (\\n\\n) between paragraphs. "
"Structure content with clear paragraphs - aim for 2-4 sentences per paragraph. "
f"{persona_context}"
"Return ONLY valid JSON with no markdown formatting or explanations."
)
# Build persona-specific content instructions
persona_instructions = ""
if req.persona:
industry = req.persona.industry or 'General'
tone = req.persona.tone or 'Professional'
audience = req.persona.audience or 'General readers'
persona_instructions = f"""
PERSONA-DRIVEN CONTENT REQUIREMENTS:
- Write as an expert in {industry} industry
- Use {tone} tone appropriate for {audience}
- Include industry-specific examples and terminology
- Demonstrate authority and expertise in the field
- Use language that resonates with {audience}
- Maintain consistent voice that reflects this persona's expertise
"""
prompt = (
f"Write blog content for the following sections. Each section should be {req.globalTargetWords or 1000} words total, distributed across all sections.\n\n"
f"Blog Title: {req.title}\n\n"
"For each section, write engaging content that:\n"
"- Follows the key points provided\n"
"- Uses the suggested keywords naturally\n"
"- Meets the target word count\n"
"- Maintains professional tone\n"
"- References the provided sources when relevant\n"
"- Breaks content into clear paragraphs (2-4 sentences each)\n"
"- Uses double line breaks (\\n\\n) between paragraphs for proper formatting\n"
"- Starts with an engaging opening paragraph\n"
"- Ends with a strong concluding paragraph\n"
f"{persona_instructions}\n"
"IMPORTANT: Format the 'content' field with proper paragraph breaks using \\n\\n between paragraphs.\n\n"
"Return a JSON object with 'title' and 'sections' array. Each section should have 'id', 'heading', 'content', and 'wordCount'.\n\n"
f"Sections to write:\n{json.dumps(payload, ensure_ascii=False, indent=2)}"
)
ai_resp = gemini_structured_json_response(
prompt=prompt,
schema=schema,
temperature=0.2,
max_tokens=8192,
system_prompt=system,
)
# Check for errors in AI response
if not ai_resp or ai_resp.get("error"):
error_msg = ai_resp.get("error", "Empty generation result from model") if ai_resp else "No response from model"
logger.error(f"AI generation failed: {error_msg}")
raise Exception(f"AI generation failed: {error_msg}")
# Normalize output
title = ai_resp.get("title") or req.title
out_sections = []
for s in ai_resp.get("sections", []) or []:
out_sections.append(
MediumGeneratedSection(
id=str(s.get("id")),
heading=s.get("heading") or "",
content=s.get("content") or "",
wordCount=int(s.get("wordCount") or 0),
sources=[
# map to ResearchSource shape if possible; keep minimal
ResearchSource(title=src.get("title", ""), url=src.get("url", ""))
for src in (s.get("sources") or [])
] or None,
)
)
duration_ms = int((time.time() - start) * 1000)
result = MediumBlogGenerateResult(
success=True,
title=title,
sections=out_sections,
model="gemini-2.5-flash",
generation_time_ms=duration_ms,
safety_flags=None,
)
# Cache the result for future use
try:
self.cache.cache_content(
keywords=req.researchKeywords or [],
sections=sections_for_cache,
global_target_words=req.globalTargetWords or 1000,
persona_data=req.persona.dict() if req.persona else None,
tone=req.tone or "professional",
audience=req.audience or "general",
result=result.dict()
)
logger.info(f"Cached content result for keywords: {req.researchKeywords}")
except Exception as cache_error:
logger.warning(f"Failed to cache content result: {cache_error}")
# Don't fail the entire operation if caching fails
return result

View File

@@ -5,6 +5,8 @@ Coordinates research, outline generation, content creation, and optimization.
"""
from typing import Dict, Any, List
import time
import uuid
from loguru import logger
from models.blog_models import (
@@ -30,6 +32,8 @@ from models.blog_models import (
from ..research import ResearchService
from ..outline import OutlineService
from ..content.enhanced_content_generator import EnhancedContentGenerator
from ..content.medium_blog_generator import MediumBlogGenerator
from ..content.blog_rewriter import BlogRewriter
from services.llm_providers.gemini_provider import gemini_structured_json_response
from services.cache.persistent_content_cache import persistent_content_cache
from models.blog_models import (
@@ -38,6 +42,47 @@ from models.blog_models import (
MediumGeneratedSection,
)
# Import task manager - we'll create a simple one for this service
class SimpleTaskManager:
"""Simple task manager for BlogWriterService."""
def __init__(self):
self.tasks = {}
def start_task(self, task_id: str, func, **kwargs):
"""Start a task with the given function and arguments."""
import asyncio
self.tasks[task_id] = {
"status": "running",
"progress": "Starting...",
"result": None,
"error": None
}
# Start the task in the background
asyncio.create_task(self._run_task(task_id, func, **kwargs))
async def _run_task(self, task_id: str, func, **kwargs):
"""Run the task function."""
try:
await func(task_id, **kwargs)
except Exception as e:
self.tasks[task_id]["status"] = "failed"
self.tasks[task_id]["error"] = str(e)
logger.error(f"Task {task_id} failed: {e}")
def update_task_status(self, task_id: str, status: str, progress: str = None, result=None):
"""Update task status."""
if task_id in self.tasks:
self.tasks[task_id]["status"] = status
if progress:
self.tasks[task_id]["progress"] = progress
if result:
self.tasks[task_id]["result"] = result
def get_task_status(self, task_id: str):
"""Get task status."""
return self.tasks.get(task_id, {"status": "not_found"})
class BlogWriterService:
"""Main service orchestrator for AI Blog Writer functionality."""
@@ -46,6 +91,9 @@ class BlogWriterService:
self.research_service = ResearchService()
self.outline_service = OutlineService()
self.content_generator = EnhancedContentGenerator()
self.task_manager = SimpleTaskManager()
self.medium_blog_generator = MediumBlogGenerator()
self.blog_rewriter = BlogRewriter(self.task_manager)
# Research Methods
async def research(self, request: BlogResearchRequest) -> BlogResearchResponse:
@@ -157,98 +205,67 @@ class BlogWriterService:
return {"success": False, "error": str(e)}
async def seo_analyze(self, request: BlogSEOAnalyzeRequest) -> BlogSEOAnalyzeResponse:
"""Analyze content for SEO optimization."""
from services.seo_tools.on_page_seo_service import OnPageSEOService
from services.seo_tools.image_alt_service import ImageAltService
from services.seo_tools.content_strategy_service import ContentStrategyService
content = request.content or ""
target_keywords = request.keywords or []
# On-page analysis (treat content as a virtual URL/document for now)
on_page = OnPageSEOService()
on_page_result = await on_page.analyze_on_page_seo(url="about:blank", target_keywords=target_keywords)
# Image alt coverage (placeholder: no images in raw content yet)
"""Analyze content for SEO optimization using comprehensive blog-specific analyzer."""
try:
image_alt_service = ImageAltService()
image_alt_status = {"total_images": 0, "missing_alt": 0}
except Exception:
image_alt_status = {"total_images": 0, "missing_alt": 0}
from services.blog_writer.seo.blog_content_seo_analyzer import BlogContentSEOAnalyzer
# Strategy hints (keywords/topics)
try:
strategy = ContentStrategyService()
strategy_hints = await strategy.analyze_content_topics(content=content)
except Exception:
strategy_hints = {"topics": [], "gaps": []}
content = request.content or ""
target_keywords = request.keywords or []
# Lightweight markdown parsing for headings/links/keywords
import re
content_text = content or ""
words = re.findall(r"[A-Za-z0-9']+", content_text)
total_words = max(len(words), 1)
heading_lines = content_text.splitlines()
h1 = sum(1 for ln in heading_lines if ln.startswith('# '))
h2 = sum(1 for ln in heading_lines if ln.startswith('## '))
h3 = sum(1 for ln in heading_lines if ln.startswith('### '))
md_links = re.findall(r"\[([^\]]+)\]\(([^)]+)\)", content_text)
external_links = [u for (_t, u) in md_links if u.startswith('http')]
# Keyword density
density_map: Dict[str, Any] = {"target_keywords": target_keywords}
for kw in target_keywords:
try:
occurrences = len(re.findall(re.escape(kw), content_text, flags=re.IGNORECASE))
except re.error:
occurrences = 0
density_map[kw] = {
"occurrences": occurrences,
"density": round(occurrences / total_words, 4)
}
# Build unified response
recommendations: List[str] = []
if isinstance(on_page_result.get("recommendations"), list):
recommendations.extend(on_page_result["recommendations"])
if strategy_hints.get("gaps"):
recommendations.append("Cover missing topics: " + ", ".join(strategy_hints["gaps"]))
if not external_links:
recommendations.append("Add at least one credible external link to authoritative sources.")
if h2 < 2:
recommendations.append("Increase number of H2 sections for better structure.")
# Internal link suggestions: generate anchors for H2s and propose cross-links
def to_anchor(h: str) -> str:
import re
a = re.sub(r"[^a-z0-9\s-]", "", h.lower())
a = re.sub(r"\s+", "-", a).strip('-')
return a
h2_headings = [ln[3:].strip() for ln in heading_lines if ln.startswith('## ')]
anchors = [to_anchor(h) for h in h2_headings]
internal_link_suggestions = []
for i in range(len(anchors)-1):
internal_link_suggestions.append({
"from": h2_headings[i],
"to": h2_headings[i+1],
"anchor": f"#{anchors[i+1]}",
"suggestion": f"Add internal link from '{h2_headings[i]}' to '{h2_headings[i+1]}'"
})
return BlogSEOAnalyzeResponse(
success=True,
seo_score=float(on_page_result.get("overall_score", 75)),
density=density_map,
structure={
**on_page_result.get("heading_structure", {}),
"markdown_headings": {"h1": h1, "h2": h2, "h3": h3},
"links": {"total": len(md_links), "external": len(external_links)}
},
readability=on_page_result.get("content_analysis", {}),
link_suggestions=([{"suggestion": "Add external citation links for key claims."}] if not external_links else []) + internal_link_suggestions,
image_alt_status=image_alt_status,
recommendations=recommendations,
)
# Use research data from request if available, otherwise create fallback
if request.research_data:
research_data = request.research_data
logger.info(f"Using research data from request: {research_data.get('keyword_analysis', {})}")
else:
# Fallback for backward compatibility
research_data = {
"keyword_analysis": {
"primary": target_keywords,
"long_tail": [],
"semantic": [],
"all_keywords": target_keywords,
"search_intent": "informational"
}
}
logger.warning("No research data provided, using fallback keywords")
# Use our comprehensive SEO analyzer
analyzer = BlogContentSEOAnalyzer()
analysis_results = await analyzer.analyze_blog_content(content, research_data)
# Convert results to response format
recommendations = analysis_results.get('actionable_recommendations', [])
# Convert recommendation objects to strings
recommendation_strings = []
for rec in recommendations:
if isinstance(rec, dict):
recommendation_strings.append(f"[{rec.get('category', 'General')}] {rec.get('recommendation', '')}")
else:
recommendation_strings.append(str(rec))
return BlogSEOAnalyzeResponse(
success=True,
seo_score=float(analysis_results.get('overall_score', 0)),
density=analysis_results.get('visualization_data', {}).get('keyword_analysis', {}).get('densities', {}),
structure=analysis_results.get('detailed_analysis', {}).get('content_structure', {}),
readability=analysis_results.get('detailed_analysis', {}).get('readability_analysis', {}),
link_suggestions=[],
image_alt_status={"total_images": 0, "missing_alt": 0},
recommendations=recommendation_strings
)
except Exception as e:
logger.error(f"SEO analysis failed: {e}")
return BlogSEOAnalyzeResponse(
success=False,
seo_score=0.0,
density={},
structure={},
readability={},
link_suggestions=[],
image_alt_status={"total_images": 0, "missing_alt": 0},
recommendations=[f"SEO analysis failed: {str(e)}"]
)
async def seo_metadata(self, request: BlogSEOMetadataRequest) -> BlogSEOMetadataResponse:
"""Generate SEO metadata for content."""
@@ -269,177 +286,171 @@ class BlogWriterService:
async def generate_medium_blog_with_progress(self, req: MediumBlogGenerateRequest, task_id: str) -> MediumBlogGenerateResult:
"""Use Gemini structured JSON to generate a medium-length blog in one call."""
import time
start = time.time()
return await self.medium_blog_generator.generate_medium_blog_with_progress(req, task_id)
# Prepare sections data for cache key generation
sections_for_cache = []
for s in req.sections:
sections_for_cache.append({
"id": s.id,
"heading": s.heading,
"keyPoints": getattr(s, "key_points", []) or getattr(s, "keyPoints", []),
"subheadings": getattr(s, "subheadings", []),
"keywords": getattr(s, "keywords", []),
"targetWords": getattr(s, "target_words", None) or getattr(s, "targetWords", None),
})
# Check cache first
cached_result = persistent_content_cache.get_cached_content(
keywords=req.researchKeywords or [],
sections=sections_for_cache,
global_target_words=req.globalTargetWords or 1000,
persona_data=req.persona.dict() if req.persona else None,
tone=req.tone,
audience=req.audience
)
if cached_result:
logger.info(f"Using cached content for keywords: {req.researchKeywords} (saved expensive generation)")
# Add cache hit marker to distinguish from fresh generation
cached_result['generation_time_ms'] = 0 # Mark as cache hit
cached_result['cache_hit'] = True
return MediumBlogGenerateResult(**cached_result)
# Cache miss - proceed with AI generation
logger.info(f"Cache miss - generating new content for keywords: {req.researchKeywords}")
# Build schema expected from the model
schema = {
"type": "object",
"properties": {
"title": {"type": "string"},
"sections": {
"type": "array",
"items": {
async def analyze_flow_basic(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze flow metrics for entire blog using single AI call (cost-effective)."""
try:
# Extract blog content from request
sections = request.get("sections", [])
title = request.get("title", "Untitled Blog")
if not sections:
return {"error": "No sections provided for analysis"}
# Combine all content for analysis
full_content = f"Title: {title}\n\n"
for section in sections:
full_content += f"Section: {section.get('heading', 'Untitled')}\n"
full_content += f"Content: {section.get('content', '')}\n\n"
# Build analysis prompt
system_prompt = """You are an expert content analyst specializing in narrative flow, consistency, and progression analysis.
Analyze the provided blog content and provide detailed, actionable feedback for improvement.
Focus on how well the content flows from section to section, maintains consistency in tone and style,
and progresses logically through the topic."""
analysis_prompt = f"""
Analyze the following blog content for narrative flow, consistency, and progression:
{full_content}
Evaluate each section and provide overall analysis with specific scores and actionable suggestions.
Consider:
- How well each section flows into the next
- Consistency in tone, style, and voice throughout
- Logical progression of ideas and arguments
- Transition quality between sections
- Overall coherence and readability
IMPORTANT: For each section in the response, use the exact section ID provided in the input.
The section IDs in your response must match the section IDs from the input exactly.
Provide detailed analysis with specific, actionable suggestions for improvement.
"""
# Use Gemini for structured analysis
from services.llm_providers.gemini_provider import gemini_structured_json_response
schema = {
"type": "object",
"properties": {
"overall_flow_score": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"overall_consistency_score": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"overall_progression_score": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"overall_coherence_score": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"sections": {
"type": "array",
"items": {
"type": "object",
"properties": {
"section_id": {"type": "string"},
"heading": {"type": "string"},
"flow_score": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"consistency_score": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"progression_score": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"coherence_score": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"transition_quality": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"suggestions": {"type": "array", "items": {"type": "string"}},
"strengths": {"type": "array", "items": {"type": "string"}},
"improvement_areas": {"type": "array", "items": {"type": "string"}}
},
"required": ["section_id", "heading", "flow_score", "consistency_score", "progression_score", "coherence_score", "transition_quality", "suggestions"]
}
},
"overall_suggestions": {"type": "array", "items": {"type": "string"}},
"overall_strengths": {"type": "array", "items": {"type": "string"}},
"overall_improvement_areas": {"type": "array", "items": {"type": "string"}},
"transition_analysis": {
"type": "object",
"properties": {
"id": {"type": "string"},
"heading": {"type": "string"},
"content": {"type": "string"},
"wordCount": {"type": "number"},
"sources": {
"type": "array",
"items": {
"type": "object",
"properties": {"title": {"type": "string"}, "url": {"type": "string"}},
},
},
},
},
},
},
}
# Compose prompt
def section_block(s):
return {
"id": s.id,
"heading": s.heading,
"outline": {
"keyPoints": getattr(s, "key_points", []) or getattr(s, "keyPoints", []),
"subheadings": getattr(s, "subheadings", []),
"keywords": getattr(s, "keywords", []),
"targetWords": getattr(s, "target_words", None) or getattr(s, "targetWords", None),
"references": [
{"title": r.title, "url": r.url} for r in getattr(s, "references", [])
],
"overall_transition_quality": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"transition_suggestions": {"type": "array", "items": {"type": "string"}}
}
}
},
"required": ["overall_flow_score", "overall_consistency_score", "overall_progression_score", "overall_coherence_score", "sections", "overall_suggestions"]
}
payload = {
"title": req.title,
"globalTargetWords": req.globalTargetWords or 1000,
"persona": req.persona.dict() if req.persona else None,
"tone": req.tone,
"audience": req.audience,
"sections": [section_block(s) for s in req.sections],
}
system = (
"You are a professional blog writer. Generate high-quality content for each section based on the provided outline. "
"Write engaging, informative content that follows the section's key points and target word count. "
"Use a professional tone and ensure the content flows naturally. "
"Format content with proper paragraph breaks using double line breaks (\\n\\n) between paragraphs. "
"Structure content with clear paragraphs - aim for 2-4 sentences per paragraph. "
"Return ONLY valid JSON with no markdown formatting or explanations."
)
import json
prompt = (
f"Write blog content for the following sections. Each section should be {req.globalTargetWords or 1000} words total, distributed across all sections.\n\n"
f"Blog Title: {req.title}\n\n"
"For each section, write engaging content that:\n"
"- Follows the key points provided\n"
"- Uses the suggested keywords naturally\n"
"- Meets the target word count\n"
"- Maintains professional tone\n"
"- References the provided sources when relevant\n"
"- Breaks content into clear paragraphs (2-4 sentences each)\n"
"- Uses double line breaks (\\n\\n) between paragraphs for proper formatting\n"
"- Starts with an engaging opening paragraph\n"
"- Ends with a strong concluding paragraph\n\n"
"IMPORTANT: Format the 'content' field with proper paragraph breaks using \\n\\n between paragraphs.\n\n"
"Return a JSON object with 'title' and 'sections' array. Each section should have 'id', 'heading', 'content', and 'wordCount'.\n\n"
f"Sections to write:\n{json.dumps(payload, ensure_ascii=False, indent=2)}"
)
ai_resp = gemini_structured_json_response(
prompt=prompt,
schema=schema,
temperature=0.2,
max_tokens=8192,
system_prompt=system,
)
# Check for errors in AI response
if not ai_resp or ai_resp.get("error"):
error_msg = ai_resp.get("error", "Empty generation result from model") if ai_resp else "No response from model"
logger.error(f"AI generation failed: {error_msg}")
raise Exception(f"AI generation failed: {error_msg}")
# Normalize output
title = ai_resp.get("title") or req.title
out_sections = []
for s in ai_resp.get("sections", []) or []:
out_sections.append(
MediumGeneratedSection(
id=str(s.get("id")),
heading=s.get("heading") or "",
content=s.get("content") or "",
wordCount=int(s.get("wordCount") or 0),
sources=[
# map to ResearchSource shape if possible; keep minimal
ResearchSource(title=src.get("title", ""), url=src.get("url", ""))
for src in (s.get("sources") or [])
] or None,
)
result = gemini_structured_json_response(
prompt=analysis_prompt,
schema=schema,
temperature=0.3,
max_tokens=4096,
system_prompt=system_prompt
)
if result and not result.get("error"):
logger.info("Basic flow analysis completed successfully")
return {"success": True, "analysis": result, "mode": "basic"}
else:
error_msg = result.get("error", "Analysis failed") if result else "No response from AI"
logger.error(f"Basic flow analysis failed: {error_msg}")
return {"error": error_msg}
except Exception as e:
logger.error(f"Basic flow analysis error: {e}")
return {"error": str(e)}
duration_ms = int((time.time() - start) * 1000)
result = MediumBlogGenerateResult(
success=True,
title=title,
sections=out_sections,
model="gemini-2.5-flash",
generation_time_ms=duration_ms,
safety_flags=None,
)
# Cache the result for future use
async def analyze_flow_advanced(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze flow metrics for each section individually (detailed but expensive)."""
try:
persistent_content_cache.cache_content(
keywords=req.researchKeywords or [],
sections=sections_for_cache,
global_target_words=req.globalTargetWords or 1000,
persona_data=req.persona.dict() if req.persona else None,
tone=req.tone or "professional",
audience=req.audience or "general",
result=result.dict()
)
logger.info(f"Cached content result for keywords: {req.researchKeywords}")
except Exception as cache_error:
logger.warning(f"Failed to cache content result: {cache_error}")
# Don't fail the entire operation if caching fails
return result
# Use the existing enhanced content generator for detailed analysis
sections = request.get("sections", [])
title = request.get("title", "Untitled Blog")
if not sections:
return {"error": "No sections provided for analysis"}
results = []
for section in sections:
# Use the existing flow analyzer for each section
section_content = section.get("content", "")
section_heading = section.get("heading", "Untitled")
# Get previous section context for better analysis
prev_section_content = ""
if len(results) > 0:
prev_section_content = results[-1].get("content", "")
# Use the existing flow analyzer
flow_metrics = self.content_generator.flow.assess_flow(
prev_section_content,
section_content,
use_llm=True
)
results.append({
"section_id": section.get("id", "unknown"),
"heading": section_heading,
"flow_score": flow_metrics.get("flow", 0.0),
"consistency_score": flow_metrics.get("consistency", 0.0),
"progression_score": flow_metrics.get("progression", 0.0),
"detailed_analysis": flow_metrics.get("analysis", ""),
"suggestions": flow_metrics.get("suggestions", [])
})
# Calculate overall scores
overall_flow = sum(r["flow_score"] for r in results) / len(results) if results else 0.0
overall_consistency = sum(r["consistency_score"] for r in results) / len(results) if results else 0.0
overall_progression = sum(r["progression_score"] for r in results) / len(results) if results else 0.0
logger.info("Advanced flow analysis completed successfully")
return {
"success": True,
"analysis": {
"overall_flow_score": overall_flow,
"overall_consistency_score": overall_consistency,
"overall_progression_score": overall_progression,
"sections": results
},
"mode": "advanced"
}
except Exception as e:
logger.error(f"Advanced flow analysis error: {e}")
return {"error": str(e)}
def start_blog_rewrite(self, request: Dict[str, Any]) -> str:
"""Start blog rewrite task with user feedback."""
return self.blog_rewriter.start_blog_rewrite(request)

View File

@@ -0,0 +1,872 @@
"""
Blog Content SEO Analyzer
Specialized SEO analyzer for blog content with parallel processing.
Leverages existing non-AI SEO tools and uses single AI prompt for structured analysis.
"""
import asyncio
import re
import textstat
from datetime import datetime
from typing import Dict, Any, List, Optional
from loguru import logger
from services.seo_analyzer import (
ContentAnalyzer, KeywordAnalyzer,
URLStructureAnalyzer, AIInsightGenerator
)
from services.llm_providers.gemini_provider import gemini_structured_json_response
class BlogContentSEOAnalyzer:
"""Specialized SEO analyzer for blog content with parallel processing"""
def __init__(self):
"""Initialize the blog content SEO analyzer"""
self.content_analyzer = ContentAnalyzer()
self.keyword_analyzer = KeywordAnalyzer()
self.url_analyzer = URLStructureAnalyzer()
self.ai_insights = AIInsightGenerator()
self.gemini_provider = gemini_structured_json_response
logger.info("BlogContentSEOAnalyzer initialized")
async def analyze_blog_content(self, blog_content: str, research_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Main analysis method with parallel processing
Args:
blog_content: The blog content to analyze
research_data: Research data containing keywords and other insights
Returns:
Comprehensive SEO analysis results
"""
try:
logger.info("Starting blog content SEO analysis")
# Extract keywords from research data
keywords_data = self._extract_keywords_from_research(research_data)
logger.info(f"Extracted keywords: {keywords_data}")
# Phase 1: Run non-AI analyzers in parallel
logger.info("Running non-AI analyzers in parallel")
non_ai_results = await self._run_non_ai_analyzers(blog_content, keywords_data)
# Phase 2: Single AI analysis for structured insights
logger.info("Running AI analysis")
ai_insights = await self._run_ai_analysis(blog_content, keywords_data, non_ai_results)
# Phase 3: Compile and format results
logger.info("Compiling results")
results = self._compile_blog_seo_results(non_ai_results, ai_insights, keywords_data)
logger.info(f"SEO analysis completed. Overall score: {results.get('overall_score', 0)}")
return results
except Exception as e:
logger.error(f"Blog SEO analysis failed: {e}")
# Fail fast - don't return fallback data
raise e
def _extract_keywords_from_research(self, research_data: Dict[str, Any]) -> Dict[str, Any]:
"""Extract keywords from research data"""
try:
logger.info(f"Extracting keywords from research data: {research_data}")
# Extract keywords from research data structure
keyword_analysis = research_data.get('keyword_analysis', {})
logger.info(f"Found keyword_analysis: {keyword_analysis}")
# Handle different possible structures
primary_keywords = []
long_tail_keywords = []
semantic_keywords = []
all_keywords = []
# Try to extract primary keywords from different possible locations
if 'primary' in keyword_analysis:
primary_keywords = keyword_analysis.get('primary', [])
elif 'keywords' in research_data:
# Fallback to top-level keywords
primary_keywords = research_data.get('keywords', [])
# Extract other keyword types
long_tail_keywords = keyword_analysis.get('long_tail', [])
# Handle both 'semantic' and 'semantic_keywords' field names
semantic_keywords = keyword_analysis.get('semantic', []) or keyword_analysis.get('semantic_keywords', [])
all_keywords = keyword_analysis.get('all_keywords', primary_keywords)
result = {
'primary': primary_keywords,
'long_tail': long_tail_keywords,
'semantic': semantic_keywords,
'all_keywords': all_keywords,
'search_intent': keyword_analysis.get('search_intent', 'informational')
}
logger.info(f"Extracted keywords: {result}")
return result
except Exception as e:
logger.error(f"Failed to extract keywords from research data: {e}")
logger.error(f"Research data structure: {research_data}")
# Fail fast - don't return empty keywords
raise ValueError(f"Keyword extraction failed: {e}")
async def _run_non_ai_analyzers(self, blog_content: str, keywords_data: Dict[str, Any]) -> Dict[str, Any]:
"""Run all non-AI analyzers in parallel for maximum performance"""
logger.info(f"Starting non-AI analyzers with content length: {len(blog_content)} chars")
logger.info(f"Keywords data: {keywords_data}")
# Parallel execution of fast analyzers
tasks = [
self._analyze_content_structure(blog_content),
self._analyze_keyword_usage(blog_content, keywords_data),
self._analyze_readability(blog_content),
self._analyze_content_quality(blog_content),
self._analyze_heading_structure(blog_content)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for exceptions and fail fast
for i, result in enumerate(results):
if isinstance(result, Exception):
task_names = ['content_structure', 'keyword_analysis', 'readability_analysis', 'content_quality', 'heading_structure']
logger.error(f"Task {task_names[i]} failed: {result}")
raise result
# Log successful results
task_names = ['content_structure', 'keyword_analysis', 'readability_analysis', 'content_quality', 'heading_structure']
for i, (name, result) in enumerate(zip(task_names, results)):
logger.info(f"{name} completed: {type(result).__name__} with {len(result) if isinstance(result, dict) else 'N/A'} fields")
return {
'content_structure': results[0],
'keyword_analysis': results[1],
'readability_analysis': results[2],
'content_quality': results[3],
'heading_structure': results[4]
}
async def _analyze_content_structure(self, content: str) -> Dict[str, Any]:
"""Analyze blog content structure"""
try:
# Parse markdown content
lines = content.split('\n')
# Count sections, paragraphs, sentences
sections = len([line for line in lines if line.startswith('##')])
paragraphs = len([line for line in lines if line.strip() and not line.startswith('#')])
sentences = len(re.findall(r'[.!?]+', content))
# Blog-specific structure analysis
has_introduction = any('introduction' in line.lower() or 'overview' in line.lower()
for line in lines[:10])
has_conclusion = any('conclusion' in line.lower() or 'summary' in line.lower()
for line in lines[-10:])
has_cta = any('call to action' in line.lower() or 'learn more' in line.lower()
for line in lines)
structure_score = self._calculate_structure_score(sections, paragraphs, has_introduction, has_conclusion)
return {
'total_sections': sections,
'total_paragraphs': paragraphs,
'total_sentences': sentences,
'has_introduction': has_introduction,
'has_conclusion': has_conclusion,
'has_call_to_action': has_cta,
'structure_score': structure_score,
'recommendations': self._get_structure_recommendations(sections, has_introduction, has_conclusion)
}
except Exception as e:
logger.error(f"Content structure analysis failed: {e}")
raise e
async def _analyze_keyword_usage(self, content: str, keywords_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze keyword usage and optimization"""
try:
# Extract keywords from research data
primary_keywords = keywords_data.get('primary', [])
long_tail_keywords = keywords_data.get('long_tail', [])
semantic_keywords = keywords_data.get('semantic', [])
# Use existing KeywordAnalyzer
keyword_result = self.keyword_analyzer.analyze(content, primary_keywords)
# Blog-specific keyword analysis
keyword_analysis = {
'primary_keywords': primary_keywords,
'long_tail_keywords': long_tail_keywords,
'semantic_keywords': semantic_keywords,
'keyword_density': {},
'keyword_distribution': {},
'missing_keywords': [],
'over_optimization': [],
'recommendations': []
}
# Analyze each keyword type
for keyword in primary_keywords:
density = self._calculate_keyword_density(content, keyword)
keyword_analysis['keyword_density'][keyword] = density
# Check if keyword appears in headings
in_headings = self._keyword_in_headings(content, keyword)
keyword_analysis['keyword_distribution'][keyword] = {
'density': density,
'in_headings': in_headings,
'first_occurrence': content.lower().find(keyword.lower())
}
# Check for missing important keywords
for keyword in primary_keywords:
if keyword.lower() not in content.lower():
keyword_analysis['missing_keywords'].append(keyword)
# Check for over-optimization
for keyword, density in keyword_analysis['keyword_density'].items():
if density > 3.0: # Over 3% density
keyword_analysis['over_optimization'].append(keyword)
return keyword_analysis
except Exception as e:
logger.error(f"Keyword analysis failed: {e}")
raise e
async def _analyze_readability(self, content: str) -> Dict[str, Any]:
"""Analyze content readability using textstat integration"""
try:
# Calculate readability metrics
readability_metrics = {
'flesch_reading_ease': textstat.flesch_reading_ease(content),
'flesch_kincaid_grade': textstat.flesch_kincaid_grade(content),
'gunning_fog': textstat.gunning_fog(content),
'smog_index': textstat.smog_index(content),
'automated_readability': textstat.automated_readability_index(content),
'coleman_liau': textstat.coleman_liau_index(content)
}
# Blog-specific readability analysis
avg_sentence_length = self._calculate_avg_sentence_length(content)
avg_paragraph_length = self._calculate_avg_paragraph_length(content)
readability_score = self._calculate_readability_score(readability_metrics)
return {
'metrics': readability_metrics,
'avg_sentence_length': avg_sentence_length,
'avg_paragraph_length': avg_paragraph_length,
'readability_score': readability_score,
'target_audience': self._determine_target_audience(readability_metrics),
'recommendations': self._get_readability_recommendations(readability_metrics, avg_sentence_length)
}
except Exception as e:
logger.error(f"Readability analysis failed: {e}")
raise e
async def _analyze_content_quality(self, content: str) -> Dict[str, Any]:
"""Analyze overall content quality"""
try:
# Word count analysis
words = content.split()
word_count = len(words)
# Content depth analysis
unique_words = len(set(word.lower() for word in words))
vocabulary_diversity = unique_words / word_count if word_count > 0 else 0
# Content flow analysis
transition_words = ['however', 'therefore', 'furthermore', 'moreover', 'additionally', 'consequently']
transition_count = sum(content.lower().count(word) for word in transition_words)
content_depth_score = self._calculate_content_depth_score(word_count, vocabulary_diversity)
flow_score = self._calculate_flow_score(transition_count, word_count)
return {
'word_count': word_count,
'unique_words': unique_words,
'vocabulary_diversity': vocabulary_diversity,
'transition_words_used': transition_count,
'content_depth_score': content_depth_score,
'flow_score': flow_score,
'recommendations': self._get_content_quality_recommendations(word_count, vocabulary_diversity, transition_count)
}
except Exception as e:
logger.error(f"Content quality analysis failed: {e}")
raise e
async def _analyze_heading_structure(self, content: str) -> Dict[str, Any]:
"""Analyze heading structure and hierarchy"""
try:
# Extract headings
h1_headings = re.findall(r'^# (.+)$', content, re.MULTILINE)
h2_headings = re.findall(r'^## (.+)$', content, re.MULTILINE)
h3_headings = re.findall(r'^### (.+)$', content, re.MULTILINE)
# Analyze heading structure
heading_hierarchy_score = self._calculate_heading_hierarchy_score(h1_headings, h2_headings, h3_headings)
return {
'h1_count': len(h1_headings),
'h2_count': len(h2_headings),
'h3_count': len(h3_headings),
'h1_headings': h1_headings,
'h2_headings': h2_headings,
'h3_headings': h3_headings,
'heading_hierarchy_score': heading_hierarchy_score,
'recommendations': self._get_heading_recommendations(h1_headings, h2_headings, h3_headings)
}
except Exception as e:
logger.error(f"Heading structure analysis failed: {e}")
raise e
# Helper methods for calculations and scoring
def _calculate_structure_score(self, sections: int, paragraphs: int, has_intro: bool, has_conclusion: bool) -> int:
"""Calculate content structure score"""
score = 0
# Section count (optimal: 3-8 sections)
if 3 <= sections <= 8:
score += 30
elif sections < 3:
score += 15
else:
score += 20
# Paragraph count (optimal: 8-20 paragraphs)
if 8 <= paragraphs <= 20:
score += 30
elif paragraphs < 8:
score += 15
else:
score += 20
# Introduction and conclusion
if has_intro:
score += 20
if has_conclusion:
score += 20
return min(score, 100)
def _calculate_keyword_density(self, content: str, keyword: str) -> float:
"""Calculate keyword density percentage"""
content_lower = content.lower()
keyword_lower = keyword.lower()
word_count = len(content.split())
keyword_count = content_lower.count(keyword_lower)
return (keyword_count / word_count * 100) if word_count > 0 else 0
def _keyword_in_headings(self, content: str, keyword: str) -> bool:
"""Check if keyword appears in headings"""
headings = re.findall(r'^#+ (.+)$', content, re.MULTILINE)
return any(keyword.lower() in heading.lower() for heading in headings)
def _calculate_avg_sentence_length(self, content: str) -> float:
"""Calculate average sentence length"""
sentences = re.split(r'[.!?]+', content)
sentences = [s.strip() for s in sentences if s.strip()]
if not sentences:
return 0
total_words = sum(len(sentence.split()) for sentence in sentences)
return total_words / len(sentences)
def _calculate_avg_paragraph_length(self, content: str) -> float:
"""Calculate average paragraph length"""
paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]
if not paragraphs:
return 0
total_words = sum(len(paragraph.split()) for paragraph in paragraphs)
return total_words / len(paragraphs)
def _calculate_readability_score(self, metrics: Dict[str, float]) -> int:
"""Calculate overall readability score"""
# Flesch Reading Ease (0-100, higher is better)
flesch_score = metrics.get('flesch_reading_ease', 0)
# Convert to 0-100 scale
if flesch_score >= 80:
return 90
elif flesch_score >= 60:
return 80
elif flesch_score >= 40:
return 70
elif flesch_score >= 20:
return 60
else:
return 50
def _determine_target_audience(self, metrics: Dict[str, float]) -> str:
"""Determine target audience based on readability metrics"""
flesch_score = metrics.get('flesch_reading_ease', 0)
if flesch_score >= 80:
return "General audience (8th grade level)"
elif flesch_score >= 60:
return "High school level"
elif flesch_score >= 40:
return "College level"
else:
return "Graduate level"
def _calculate_content_depth_score(self, word_count: int, vocabulary_diversity: float) -> int:
"""Calculate content depth score"""
score = 0
# Word count (optimal: 800-2000 words)
if 800 <= word_count <= 2000:
score += 50
elif word_count < 800:
score += 30
else:
score += 40
# Vocabulary diversity (optimal: 0.4-0.7)
if 0.4 <= vocabulary_diversity <= 0.7:
score += 50
elif vocabulary_diversity < 0.4:
score += 30
else:
score += 40
return min(score, 100)
def _calculate_flow_score(self, transition_count: int, word_count: int) -> int:
"""Calculate content flow score"""
if word_count == 0:
return 0
transition_density = transition_count / (word_count / 100)
# Optimal transition density: 1-3 per 100 words
if 1 <= transition_density <= 3:
return 90
elif transition_density < 1:
return 60
else:
return 70
def _calculate_heading_hierarchy_score(self, h1: List[str], h2: List[str], h3: List[str]) -> int:
"""Calculate heading hierarchy score"""
score = 0
# Should have exactly 1 H1
if len(h1) == 1:
score += 40
elif len(h1) == 0:
score += 20
else:
score += 10
# Should have 3-8 H2 headings
if 3 <= len(h2) <= 8:
score += 40
elif len(h2) < 3:
score += 20
else:
score += 30
# H3 headings are optional but good for structure
if len(h3) > 0:
score += 20
return min(score, 100)
def _calculate_keyword_score(self, keyword_analysis: Dict[str, Any]) -> int:
"""Calculate keyword optimization score"""
score = 0
# Check keyword density (optimal: 1-3%)
densities = keyword_analysis.get('keyword_density', {})
for keyword, density in densities.items():
if 1 <= density <= 3:
score += 30
elif density < 1:
score += 15
else:
score += 10
# Check keyword distribution
distributions = keyword_analysis.get('keyword_distribution', {})
for keyword, dist in distributions.items():
if dist.get('in_headings', False):
score += 20
if dist.get('first_occurrence', -1) < 100: # Early occurrence
score += 20
# Penalize missing keywords
missing = len(keyword_analysis.get('missing_keywords', []))
score -= missing * 10
# Penalize over-optimization
over_opt = len(keyword_analysis.get('over_optimization', []))
score -= over_opt * 15
return max(0, min(score, 100))
def _calculate_weighted_score(self, scores: Dict[str, int]) -> int:
"""Calculate weighted overall score"""
weights = {
'structure': 0.2,
'keywords': 0.25,
'readability': 0.2,
'quality': 0.15,
'headings': 0.1,
'ai_insights': 0.1
}
weighted_sum = sum(scores.get(key, 0) * weight for key, weight in weights.items())
return int(weighted_sum)
# Recommendation methods
def _get_structure_recommendations(self, sections: int, has_intro: bool, has_conclusion: bool) -> List[str]:
"""Get structure recommendations"""
recommendations = []
if sections < 3:
recommendations.append("Add more sections to improve content structure")
elif sections > 8:
recommendations.append("Consider combining some sections for better flow")
if not has_intro:
recommendations.append("Add an introduction section to set context")
if not has_conclusion:
recommendations.append("Add a conclusion section to summarize key points")
return recommendations
def _get_readability_recommendations(self, metrics: Dict[str, float], avg_sentence_length: float) -> List[str]:
"""Get readability recommendations"""
recommendations = []
flesch_score = metrics.get('flesch_reading_ease', 0)
if flesch_score < 60:
recommendations.append("Simplify language and use shorter sentences")
if avg_sentence_length > 20:
recommendations.append("Break down long sentences for better readability")
if flesch_score > 80:
recommendations.append("Consider adding more technical depth for expert audience")
return recommendations
def _get_content_quality_recommendations(self, word_count: int, vocabulary_diversity: float, transition_count: int) -> List[str]:
"""Get content quality recommendations"""
recommendations = []
if word_count < 800:
recommendations.append("Expand content with more detailed explanations")
elif word_count > 2000:
recommendations.append("Consider breaking into multiple posts")
if vocabulary_diversity < 0.4:
recommendations.append("Use more varied vocabulary to improve engagement")
if transition_count < 3:
recommendations.append("Add more transition words to improve flow")
return recommendations
def _get_heading_recommendations(self, h1: List[str], h2: List[str], h3: List[str]) -> List[str]:
"""Get heading recommendations"""
recommendations = []
if len(h1) == 0:
recommendations.append("Add a main H1 heading")
elif len(h1) > 1:
recommendations.append("Use only one H1 heading per post")
if len(h2) < 3:
recommendations.append("Add more H2 headings to structure content")
elif len(h2) > 8:
recommendations.append("Consider using H3 headings for better hierarchy")
return recommendations
async def _run_ai_analysis(self, blog_content: str, keywords_data: Dict[str, Any], non_ai_results: Dict[str, Any]) -> Dict[str, Any]:
"""Run single AI analysis for structured insights"""
try:
# Prepare context for AI analysis
context = {
'blog_content': blog_content,
'keywords_data': keywords_data,
'non_ai_results': non_ai_results
}
# Create AI prompt for structured analysis
prompt = self._create_ai_analysis_prompt(context)
# Get structured response from Gemini
schema = {
"type": "object",
"properties": {
"content_quality_insights": {
"type": "object",
"properties": {
"engagement_score": {"type": "number"},
"value_proposition": {"type": "string"},
"content_gaps": {"type": "array", "items": {"type": "string"}},
"improvement_suggestions": {"type": "array", "items": {"type": "string"}}
}
},
"seo_optimization_insights": {
"type": "object",
"properties": {
"keyword_optimization": {"type": "string"},
"content_relevance": {"type": "string"},
"search_intent_alignment": {"type": "string"},
"seo_improvements": {"type": "array", "items": {"type": "string"}}
}
},
"user_experience_insights": {
"type": "object",
"properties": {
"content_flow": {"type": "string"},
"readability_assessment": {"type": "string"},
"engagement_factors": {"type": "array", "items": {"type": "string"}},
"ux_improvements": {"type": "array", "items": {"type": "string"}}
}
},
"competitive_analysis": {
"type": "object",
"properties": {
"content_differentiation": {"type": "string"},
"unique_value": {"type": "string"},
"competitive_advantages": {"type": "array", "items": {"type": "string"}},
"market_positioning": {"type": "string"}
}
}
}
}
ai_response = self.gemini_provider(
prompt=prompt,
schema=schema,
temperature=0.2,
max_tokens=8192
)
return ai_response
except Exception as e:
logger.error(f"AI analysis failed: {e}")
# Fail fast - don't return mock data
raise e
def _create_ai_analysis_prompt(self, context: Dict[str, Any]) -> str:
"""Create AI analysis prompt"""
blog_content = context['blog_content']
keywords_data = context['keywords_data']
non_ai_results = context['non_ai_results']
prompt = f"""
Analyze this blog content for SEO optimization and user experience. Provide structured insights based on the content and keyword data.
BLOG CONTENT:
{blog_content[:2000]}...
KEYWORDS DATA:
Primary Keywords: {keywords_data.get('primary', [])}
Long-tail Keywords: {keywords_data.get('long_tail', [])}
Semantic Keywords: {keywords_data.get('semantic', [])}
Search Intent: {keywords_data.get('search_intent', 'informational')}
NON-AI ANALYSIS RESULTS:
Structure Score: {non_ai_results.get('content_structure', {}).get('structure_score', 0)}
Readability Score: {non_ai_results.get('readability_analysis', {}).get('readability_score', 0)}
Content Quality Score: {non_ai_results.get('content_quality', {}).get('content_depth_score', 0)}
Please provide:
1. Content Quality Insights: Assess engagement potential, value proposition, content gaps, and improvement suggestions
2. SEO Optimization Insights: Evaluate keyword optimization, content relevance, search intent alignment, and SEO improvements
3. User Experience Insights: Analyze content flow, readability, engagement factors, and UX improvements
4. Competitive Analysis: Identify content differentiation, unique value, competitive advantages, and market positioning
Focus on actionable insights that can improve the blog's performance and user engagement.
"""
return prompt
def _compile_blog_seo_results(self, non_ai_results: Dict[str, Any], ai_insights: Dict[str, Any], keywords_data: Dict[str, Any]) -> Dict[str, Any]:
"""Compile comprehensive SEO analysis results"""
try:
# Validate required data - fail fast if missing
if not non_ai_results:
raise ValueError("Non-AI analysis results are missing")
if not ai_insights:
raise ValueError("AI insights are missing")
# Calculate category scores
category_scores = {
'structure': non_ai_results.get('content_structure', {}).get('structure_score', 0),
'keywords': self._calculate_keyword_score(non_ai_results.get('keyword_analysis', {})),
'readability': non_ai_results.get('readability_analysis', {}).get('readability_score', 0),
'quality': non_ai_results.get('content_quality', {}).get('content_depth_score', 0),
'headings': non_ai_results.get('heading_structure', {}).get('heading_hierarchy_score', 0),
'ai_insights': ai_insights.get('content_quality_insights', {}).get('engagement_score', 0)
}
# Calculate overall score
overall_score = self._calculate_weighted_score(category_scores)
# Compile actionable recommendations
actionable_recommendations = self._compile_actionable_recommendations(non_ai_results, ai_insights)
# Create visualization data
visualization_data = self._create_visualization_data(category_scores, non_ai_results)
return {
'overall_score': overall_score,
'category_scores': category_scores,
'detailed_analysis': non_ai_results,
'ai_insights': ai_insights,
'keywords_data': keywords_data,
'visualization_data': visualization_data,
'actionable_recommendations': actionable_recommendations,
'generated_at': datetime.utcnow().isoformat(),
'analysis_summary': self._create_analysis_summary(overall_score, category_scores, ai_insights)
}
except Exception as e:
logger.error(f"Results compilation failed: {e}")
# Fail fast - don't return fallback data
raise e
def _compile_actionable_recommendations(self, non_ai_results: Dict[str, Any], ai_insights: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Compile actionable recommendations from all sources"""
recommendations = []
# Structure recommendations
structure_recs = non_ai_results.get('content_structure', {}).get('recommendations', [])
for rec in structure_recs:
recommendations.append({
'category': 'Structure',
'priority': 'High',
'recommendation': rec,
'impact': 'Improves content organization and user experience'
})
# Keyword recommendations
keyword_recs = non_ai_results.get('keyword_analysis', {}).get('recommendations', [])
for rec in keyword_recs:
recommendations.append({
'category': 'Keywords',
'priority': 'High',
'recommendation': rec,
'impact': 'Improves search engine visibility'
})
# Readability recommendations
readability_recs = non_ai_results.get('readability_analysis', {}).get('recommendations', [])
for rec in readability_recs:
recommendations.append({
'category': 'Readability',
'priority': 'Medium',
'recommendation': rec,
'impact': 'Improves user engagement and comprehension'
})
# AI insights recommendations
ai_recs = ai_insights.get('content_quality_insights', {}).get('improvement_suggestions', [])
for rec in ai_recs:
recommendations.append({
'category': 'Content Quality',
'priority': 'Medium',
'recommendation': rec,
'impact': 'Enhances content value and engagement'
})
return recommendations
def _create_visualization_data(self, category_scores: Dict[str, int], non_ai_results: Dict[str, Any]) -> Dict[str, Any]:
"""Create data for visualization components"""
return {
'score_radar': {
'categories': list(category_scores.keys()),
'scores': list(category_scores.values()),
'max_score': 100
},
'keyword_analysis': {
'densities': non_ai_results.get('keyword_analysis', {}).get('keyword_density', {}),
'missing_keywords': non_ai_results.get('keyword_analysis', {}).get('missing_keywords', []),
'over_optimization': non_ai_results.get('keyword_analysis', {}).get('over_optimization', [])
},
'readability_metrics': non_ai_results.get('readability_analysis', {}).get('metrics', {}),
'content_stats': {
'word_count': non_ai_results.get('content_quality', {}).get('word_count', 0),
'sections': non_ai_results.get('content_structure', {}).get('total_sections', 0),
'paragraphs': non_ai_results.get('content_structure', {}).get('total_paragraphs', 0)
}
}
def _create_analysis_summary(self, overall_score: int, category_scores: Dict[str, int], ai_insights: Dict[str, Any]) -> Dict[str, Any]:
"""Create analysis summary"""
# Determine overall grade
if overall_score >= 90:
grade = 'A'
status = 'Excellent'
elif overall_score >= 80:
grade = 'B'
status = 'Good'
elif overall_score >= 70:
grade = 'C'
status = 'Fair'
elif overall_score >= 60:
grade = 'D'
status = 'Needs Improvement'
else:
grade = 'F'
status = 'Poor'
# Find strongest and weakest categories
strongest_category = max(category_scores.items(), key=lambda x: x[1])
weakest_category = min(category_scores.items(), key=lambda x: x[1])
return {
'overall_grade': grade,
'status': status,
'strongest_category': strongest_category[0],
'weakest_category': weakest_category[0],
'key_strengths': self._identify_key_strengths(category_scores),
'key_weaknesses': self._identify_key_weaknesses(category_scores),
'ai_summary': ai_insights.get('content_quality_insights', {}).get('value_proposition', '')
}
def _identify_key_strengths(self, category_scores: Dict[str, int]) -> List[str]:
"""Identify key strengths"""
strengths = []
for category, score in category_scores.items():
if score >= 80:
strengths.append(f"Strong {category} optimization")
return strengths
def _identify_key_weaknesses(self, category_scores: Dict[str, int]) -> List[str]:
"""Identify key weaknesses"""
weaknesses = []
for category, score in category_scores.items():
if score < 60:
weaknesses.append(f"Needs improvement in {category}")
return weaknesses
def _create_error_result(self, error_message: str) -> Dict[str, Any]:
"""Create error result - this should not be used in fail-fast mode"""
raise ValueError(f"Error result creation not allowed in fail-fast mode: {error_message}")

View File

@@ -0,0 +1,131 @@
"""
Test script for Blog Content SEO Analyzer
This script tests the core functionality of the SEO analyzer
without requiring the full application setup.
"""
import asyncio
import sys
import os
# Add the backend directory to the Python path
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'backend'))
from services.blog_writer.seo.blog_content_seo_analyzer import BlogContentSEOAnalyzer
async def test_seo_analyzer():
"""Test the SEO analyzer with sample data"""
# Sample blog content
sample_content = """
# The Ultimate Guide to AI-Powered Blog Writing
## Introduction
In today's digital landscape, content creation has become more important than ever. With the rise of artificial intelligence, we're seeing revolutionary changes in how we approach blog writing and content marketing.
## What is AI-Powered Blog Writing?
AI-powered blog writing refers to the use of artificial intelligence tools and technologies to assist in the creation, optimization, and management of blog content. This includes everything from research and outline generation to content creation and SEO optimization.
## Key Benefits of AI Blog Writing
### 1. Increased Efficiency
AI tools can significantly reduce the time required to create high-quality blog content. What used to take hours can now be completed in minutes.
### 2. Improved SEO Performance
AI-powered tools can analyze search trends, identify optimal keywords, and ensure content is optimized for search engines.
### 3. Enhanced Content Quality
With AI assistance, writers can focus on strategy and creativity while AI handles the technical aspects of content creation.
## Best Practices for AI Blog Writing
1. **Start with Research**: Use AI tools to gather comprehensive information about your topic
2. **Create Detailed Outlines**: Leverage AI to structure your content effectively
3. **Optimize for SEO**: Use AI analysis to ensure your content ranks well
4. **Review and Refine**: Always review AI-generated content before publishing
## Conclusion
AI-powered blog writing is transforming the content creation landscape. By leveraging these tools effectively, content creators can produce higher quality content more efficiently than ever before.
The future of content creation is here, and it's powered by artificial intelligence.
"""
# Sample research data
sample_research_data = {
"keyword_analysis": {
"primary": ["AI blog writing", "artificial intelligence content", "AI content creation"],
"long_tail": ["AI-powered blog writing tools", "artificial intelligence content marketing", "AI blog writing software"],
"semantic": ["content automation", "AI writing assistant", "automated content creation", "AI content optimization"],
"all_keywords": ["AI blog writing", "artificial intelligence content", "AI content creation", "AI-powered blog writing tools", "artificial intelligence content marketing", "AI blog writing software", "content automation", "AI writing assistant", "automated content creation", "AI content optimization"],
"search_intent": "informational"
},
"competitor_analysis": {
"top_competitors": ["HubSpot", "Content Marketing Institute", "Copyblogger"],
"content_gaps": ["AI-specific use cases", "ROI measurement", "implementation strategies"]
},
"content_angles": [
"Beginner's guide to AI blog writing",
"ROI of AI content creation tools",
"AI vs human content creation comparison"
]
}
print("🚀 Starting SEO Analysis Test")
print("=" * 50)
try:
# Initialize the analyzer
analyzer = BlogContentSEOAnalyzer()
print("✅ SEO Analyzer initialized successfully")
# Run the analysis
print("\n📊 Running SEO analysis...")
results = await analyzer.analyze_blog_content(sample_content, sample_research_data)
# Display results
print("\n📈 Analysis Results:")
print("=" * 30)
if 'error' in results:
print(f"❌ Analysis failed: {results['error']}")
return
print(f"🎯 Overall Score: {results.get('overall_score', 0)}/100")
print(f"📊 Overall Grade: {results.get('analysis_summary', {}).get('overall_grade', 'N/A')}")
print(f"📝 Status: {results.get('analysis_summary', {}).get('status', 'N/A')}")
print("\n📋 Category Scores:")
category_scores = results.get('category_scores', {})
for category, score in category_scores.items():
print(f"{category.capitalize()}: {score}/100")
print("\n💡 Key Strengths:")
strengths = results.get('analysis_summary', {}).get('key_strengths', [])
for strength in strengths:
print(f"{strength}")
print("\n⚠️ Areas for Improvement:")
weaknesses = results.get('analysis_summary', {}).get('key_weaknesses', [])
for weakness in weaknesses:
print(f" 🔧 {weakness}")
print("\n📝 Actionable Recommendations:")
recommendations = results.get('actionable_recommendations', [])
for i, rec in enumerate(recommendations[:5], 1): # Show first 5 recommendations
print(f" {i}. [{rec.get('category', 'N/A')}] {rec.get('recommendation', 'N/A')}")
print("\n🎉 SEO Analysis completed successfully!")
except Exception as e:
print(f"❌ Test failed with error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_seo_analyzer())