AI Blog Writer - Implement modular architecture with research, outline, and core services

This commit is contained in:
ajaysi
2025-09-12 16:53:16 +05:30
parent c0a366269d
commit 2ae0c4a8b9
29 changed files with 3210 additions and 907 deletions

View File

@@ -0,0 +1,21 @@
"""
Research module for AI Blog Writer.
This module handles all research-related functionality including:
- Google Search grounding integration
- Keyword analysis and competitor research
- Content angle discovery
- Research caching and optimization
"""
from .research_service import ResearchService
from .keyword_analyzer import KeywordAnalyzer
from .competitor_analyzer import CompetitorAnalyzer
from .content_angle_generator import ContentAngleGenerator
__all__ = [
'ResearchService',
'KeywordAnalyzer',
'CompetitorAnalyzer',
'ContentAngleGenerator'
]

View File

@@ -0,0 +1,71 @@
"""
Competitor Analyzer - AI-powered competitor analysis for research content.
Extracts competitor insights and market intelligence from research content.
"""
from typing import Dict, Any
from loguru import logger
class CompetitorAnalyzer:
"""Analyzes competitors and market intelligence from research content."""
def analyze(self, content: str) -> Dict[str, Any]:
"""Parse comprehensive competitor analysis from the research content using AI."""
competitor_prompt = f"""
Analyze the following research content and extract competitor insights:
Research Content:
{content[:3000]}
Extract and analyze:
1. Top competitors mentioned (companies, brands, platforms)
2. Content gaps (what competitors are missing)
3. Market opportunities (untapped areas)
4. Competitive advantages (what makes content unique)
5. Market positioning insights
6. Industry leaders and their strategies
Respond with JSON:
{{
"top_competitors": ["competitor1", "competitor2"],
"content_gaps": ["gap1", "gap2"],
"opportunities": ["opportunity1", "opportunity2"],
"competitive_advantages": ["advantage1", "advantage2"],
"market_positioning": "positioning insights",
"industry_leaders": ["leader1", "leader2"],
"analysis_notes": "Comprehensive competitor analysis summary"
}}
"""
from services.llm_providers.gemini_provider import gemini_structured_json_response
competitor_schema = {
"type": "object",
"properties": {
"top_competitors": {"type": "array", "items": {"type": "string"}},
"content_gaps": {"type": "array", "items": {"type": "string"}},
"opportunities": {"type": "array", "items": {"type": "string"}},
"competitive_advantages": {"type": "array", "items": {"type": "string"}},
"market_positioning": {"type": "string"},
"industry_leaders": {"type": "array", "items": {"type": "string"}},
"analysis_notes": {"type": "string"}
},
"required": ["top_competitors", "content_gaps", "opportunities", "competitive_advantages", "market_positioning", "industry_leaders", "analysis_notes"]
}
competitor_analysis = gemini_structured_json_response(
prompt=competitor_prompt,
schema=competitor_schema,
temperature=0.3,
max_tokens=1000
)
if isinstance(competitor_analysis, dict) and 'error' not in competitor_analysis:
return competitor_analysis
else:
# Fail gracefully - no fallback data
logger.error(f"AI competitor analysis failed: {competitor_analysis}")
raise ValueError(f"Competitor analysis failed: {competitor_analysis.get('error', 'Unknown error')}")

View File

@@ -0,0 +1,79 @@
"""
Content Angle Generator - AI-powered content angle discovery.
Generates strategic content angles from research content for blog posts.
"""
from typing import List
from loguru import logger
class ContentAngleGenerator:
"""Generates strategic content angles from research content."""
def generate(self, content: str, topic: str, industry: str) -> List[str]:
"""Parse strategic content angles from the research content using AI."""
angles_prompt = f"""
Analyze the following research content and create strategic content angles for: {topic} in {industry}
Research Content:
{content[:3000]}
Create 7 compelling content angles that:
1. Leverage current trends and data from the research
2. Address content gaps and opportunities
3. Appeal to different audience segments
4. Include unique perspectives not covered by competitors
5. Incorporate specific statistics, case studies, or expert insights
6. Create emotional connection and urgency
7. Provide actionable value to readers
Each angle should be:
- Specific and data-driven
- Unique and differentiated
- Compelling and click-worthy
- Actionable for readers
Respond with JSON:
{{
"content_angles": [
"Specific angle 1 with data/trends",
"Specific angle 2 with unique perspective",
"Specific angle 3 with actionable insights",
"Specific angle 4 with case study focus",
"Specific angle 5 with future outlook",
"Specific angle 6 with problem-solving focus",
"Specific angle 7 with industry insights"
]
}}
"""
from services.llm_providers.gemini_provider import gemini_structured_json_response
angles_schema = {
"type": "object",
"properties": {
"content_angles": {
"type": "array",
"items": {"type": "string"},
"minItems": 5,
"maxItems": 7
}
},
"required": ["content_angles"]
}
angles_result = gemini_structured_json_response(
prompt=angles_prompt,
schema=angles_schema,
temperature=0.7,
max_tokens=800
)
if isinstance(angles_result, dict) and 'content_angles' in angles_result:
return angles_result['content_angles'][:7]
else:
# Fail gracefully - no fallback data
logger.error(f"AI content angles generation failed: {angles_result}")
raise ValueError(f"Content angles generation failed: {angles_result.get('error', 'Unknown error')}")

View File

@@ -0,0 +1,78 @@
"""
Keyword Analyzer - AI-powered keyword analysis for research content.
Extracts and analyzes keywords from research content using structured AI responses.
"""
from typing import Dict, Any, List
from loguru import logger
class KeywordAnalyzer:
"""Analyzes keywords from research content using AI-powered extraction."""
def analyze(self, content: str, original_keywords: List[str]) -> Dict[str, Any]:
"""Parse comprehensive keyword analysis from the research content using AI."""
# Use AI to extract and analyze keywords from the rich research content
keyword_prompt = f"""
Analyze the following research content and extract comprehensive keyword insights for: {', '.join(original_keywords)}
Research Content:
{content[:3000]} # Limit to avoid token limits
Extract and analyze:
1. Primary keywords (main topic terms)
2. Secondary keywords (related terms, synonyms)
3. Long-tail opportunities (specific phrases people search for)
4. Search intent (informational, commercial, navigational, transactional)
5. Keyword difficulty assessment (1-10 scale)
6. Content gaps (what competitors are missing)
7. Semantic keywords (related concepts)
8. Trending terms (emerging keywords)
Respond with JSON:
{{
"primary": ["keyword1", "keyword2"],
"secondary": ["related1", "related2"],
"long_tail": ["specific phrase 1", "specific phrase 2"],
"search_intent": "informational|commercial|navigational|transactional",
"difficulty": 7,
"content_gaps": ["gap1", "gap2"],
"semantic_keywords": ["concept1", "concept2"],
"trending_terms": ["trend1", "trend2"],
"analysis_insights": "Brief analysis of keyword landscape"
}}
"""
from services.llm_providers.gemini_provider import gemini_structured_json_response
keyword_schema = {
"type": "object",
"properties": {
"primary": {"type": "array", "items": {"type": "string"}},
"secondary": {"type": "array", "items": {"type": "string"}},
"long_tail": {"type": "array", "items": {"type": "string"}},
"search_intent": {"type": "string"},
"difficulty": {"type": "integer"},
"content_gaps": {"type": "array", "items": {"type": "string"}},
"semantic_keywords": {"type": "array", "items": {"type": "string"}},
"trending_terms": {"type": "array", "items": {"type": "string"}},
"analysis_insights": {"type": "string"}
},
"required": ["primary", "secondary", "long_tail", "search_intent", "difficulty", "content_gaps", "semantic_keywords", "trending_terms", "analysis_insights"]
}
keyword_analysis = gemini_structured_json_response(
prompt=keyword_prompt,
schema=keyword_schema,
temperature=0.3,
max_tokens=1000
)
if isinstance(keyword_analysis, dict) and 'error' not in keyword_analysis:
return keyword_analysis
else:
# Fail gracefully - no fallback data
logger.error(f"AI keyword analysis failed: {keyword_analysis}")
raise ValueError(f"Keyword analysis failed: {keyword_analysis.get('error', 'Unknown error')}")

View File

@@ -0,0 +1,268 @@
"""
Research Service - Core research functionality for AI Blog Writer.
Handles Google Search grounding, caching, and research orchestration.
"""
from typing import Dict, Any, List
from loguru import logger
from models.blog_models import (
BlogResearchRequest,
BlogResearchResponse,
ResearchSource,
)
from .keyword_analyzer import KeywordAnalyzer
from .competitor_analyzer import CompetitorAnalyzer
from .content_angle_generator import ContentAngleGenerator
class ResearchService:
"""Service for conducting comprehensive research using Google Search grounding."""
def __init__(self):
self.keyword_analyzer = KeywordAnalyzer()
self.competitor_analyzer = CompetitorAnalyzer()
self.content_angle_generator = ContentAngleGenerator()
async def research(self, request: BlogResearchRequest) -> BlogResearchResponse:
"""
Stage 1: Research & Strategy (AI Orchestration)
Uses ONLY Gemini's native Google Search grounding - ONE API call for everything.
Follows LinkedIn service pattern for efficiency and cost optimization.
Includes intelligent caching for exact keyword matches.
"""
try:
from services.llm_providers.gemini_grounded_provider import GeminiGroundedProvider
from services.cache.research_cache import research_cache
topic = request.topic or ", ".join(request.keywords)
industry = request.industry or (request.persona.industry if request.persona and request.persona.industry else "General")
target_audience = getattr(request.persona, 'target_audience', 'General') if request.persona else 'General'
# Check cache first for exact keyword match
cached_result = research_cache.get_cached_result(
keywords=request.keywords,
industry=industry,
target_audience=target_audience
)
if cached_result:
logger.info(f"Returning cached research result for keywords: {request.keywords}")
return BlogResearchResponse(**cached_result)
# Cache miss - proceed with API call
logger.info(f"Cache miss - making API call for keywords: {request.keywords}")
gemini = GeminiGroundedProvider()
# Single comprehensive research prompt - Gemini handles Google Search automatically
research_prompt = f"""
Research the topic "{topic}" in the {industry} industry for {target_audience} audience. Provide a comprehensive analysis including:
1. Current trends and insights (2024-2025)
2. Key statistics and data points with sources
3. Industry expert opinions and quotes
4. Recent developments and news
5. Market analysis and forecasts
6. Best practices and case studies
7. Keyword analysis: primary, secondary, and long-tail opportunities
8. Competitor analysis: top players and content gaps
9. Content angle suggestions: 5 compelling angles for blog posts
Focus on factual, up-to-date information from credible sources.
Include specific data points, percentages, and recent developments.
Structure your response with clear sections for each analysis area.
"""
# Single Gemini call with native Google Search grounding - no fallbacks
gemini_result = await gemini.generate_grounded_content(
prompt=research_prompt,
content_type="research",
max_tokens=2000
)
# Extract sources from grounding metadata
sources = self._extract_sources_from_grounding(gemini_result)
# Extract search widget and queries for UI display
search_widget = gemini_result.get("search_widget", "") or ""
search_queries = gemini_result.get("search_queries", []) or []
# Parse the comprehensive response for different analysis components
content = gemini_result.get("content", "")
keyword_analysis = self.keyword_analyzer.analyze(content, request.keywords)
competitor_analysis = self.competitor_analyzer.analyze(content)
suggested_angles = self.content_angle_generator.generate(content, topic, industry)
logger.info(f"Research completed successfully with {len(sources)} sources and {len(search_queries)} search queries")
# Create the response
response = BlogResearchResponse(
success=True,
sources=sources,
keyword_analysis=keyword_analysis,
competitor_analysis=competitor_analysis,
suggested_angles=suggested_angles,
# Add search widget and queries for UI display
search_widget=search_widget if 'search_widget' in locals() else "",
search_queries=search_queries if 'search_queries' in locals() else [],
)
# Cache the successful result for future exact keyword matches
research_cache.cache_result(
keywords=request.keywords,
industry=industry,
target_audience=target_audience,
result=response.dict()
)
return response
except Exception as e:
error_message = str(e)
logger.error(f"Research failed: {error_message}")
# Return a graceful failure response instead of raising
return BlogResearchResponse(
success=False,
sources=[],
keyword_analysis={},
competitor_analysis={},
suggested_angles=[],
search_widget="",
search_queries=[],
error_message=error_message
)
async def research_with_progress(self, request: BlogResearchRequest, task_id: str) -> BlogResearchResponse:
"""
Research method with progress updates for real-time feedback.
"""
try:
from services.llm_providers.gemini_grounded_provider import GeminiGroundedProvider
from services.cache.research_cache import research_cache
from api.blog_writer.router import _update_progress
topic = request.topic or ", ".join(request.keywords)
industry = request.industry or (request.persona.industry if request.persona and request.persona.industry else "General")
target_audience = getattr(request.persona, 'target_audience', 'General') if request.persona else 'General'
# Check cache first for exact keyword match
await _update_progress(task_id, "🔍 Checking cache for existing research...")
cached_result = research_cache.get_cached_result(
keywords=request.keywords,
industry=industry,
target_audience=target_audience
)
if cached_result:
await _update_progress(task_id, "✅ Found cached research results! Returning instantly...")
logger.info(f"Returning cached research result for keywords: {request.keywords}")
return BlogResearchResponse(**cached_result)
# Cache miss - proceed with API call
await _update_progress(task_id, "🌐 Cache miss - connecting to Google Search grounding...")
logger.info(f"Cache miss - making API call for keywords: {request.keywords}")
gemini = GeminiGroundedProvider()
# Single comprehensive research prompt - Gemini handles Google Search automatically
research_prompt = f"""
Research the topic "{topic}" in the {industry} industry for {target_audience} audience. Provide a comprehensive analysis including:
1. Current trends and insights (2024-2025)
2. Key statistics and data points with sources
3. Industry expert opinions and quotes
4. Recent developments and news
5. Market analysis and forecasts
6. Best practices and case studies
7. Keyword analysis: primary, secondary, and long-tail opportunities
8. Competitor analysis: top players and content gaps
9. Content angle suggestions: 5 compelling angles for blog posts
Focus on factual, up-to-date information from credible sources.
Include specific data points, percentages, and recent developments.
Structure your response with clear sections for each analysis area.
"""
await _update_progress(task_id, "🤖 Making AI request to Gemini with Google Search grounding...")
# Single Gemini call with native Google Search grounding - no fallbacks
gemini_result = await gemini.generate_grounded_content(
prompt=research_prompt,
content_type="research",
max_tokens=2000
)
await _update_progress(task_id, "📊 Processing research results and extracting insights...")
# Extract sources from grounding metadata
sources = self._extract_sources_from_grounding(gemini_result)
# Extract search widget and queries for UI display
search_widget = gemini_result.get("search_widget", "") or ""
search_queries = gemini_result.get("search_queries", []) or []
await _update_progress(task_id, "🔍 Analyzing keywords and content angles...")
# Parse the comprehensive response for different analysis components
content = gemini_result.get("content", "")
keyword_analysis = self.keyword_analyzer.analyze(content, request.keywords)
competitor_analysis = self.competitor_analyzer.analyze(content)
suggested_angles = self.content_angle_generator.generate(content, topic, industry)
await _update_progress(task_id, "💾 Caching results for future use...")
logger.info(f"Research completed successfully with {len(sources)} sources and {len(search_queries)} search queries")
# Create the response
response = BlogResearchResponse(
success=True,
sources=sources,
keyword_analysis=keyword_analysis,
competitor_analysis=competitor_analysis,
suggested_angles=suggested_angles,
# Add search widget and queries for UI display
search_widget=search_widget if 'search_widget' in locals() else "",
search_queries=search_queries if 'search_queries' in locals() else [],
)
# Cache the successful result for future exact keyword matches
research_cache.cache_result(
keywords=request.keywords,
industry=industry,
target_audience=target_audience,
result=response.dict()
)
return response
except Exception as e:
error_message = str(e)
logger.error(f"Research failed: {error_message}")
# Return a graceful failure response instead of raising
return BlogResearchResponse(
success=False,
sources=[],
keyword_analysis={},
competitor_analysis={},
suggested_angles=[],
search_widget="",
search_queries=[],
error_message=error_message
)
def _extract_sources_from_grounding(self, gemini_result: Dict[str, Any]) -> List[ResearchSource]:
"""Extract sources from Gemini grounding metadata."""
sources = []
# The Gemini grounded provider already extracts sources and puts them in the 'sources' field
raw_sources = gemini_result.get("sources", [])
for src in raw_sources:
source = ResearchSource(
title=src.get("title", "Untitled"),
url=src.get("url", ""),
excerpt=src.get("content", "")[:500] if src.get("content") else f"Source from {src.get('title', 'web')}",
credibility_score=float(src.get("credibility_score", 0.8)),
published_at=str(src.get("publication_date", "2024-01-01"))
)
sources.append(source)
return sources