Research component integration, Copilotkit implementation, SEO copilotkit implementation, Wix SEO metadata complete, Wix SEO metadata review

This commit is contained in:
ajaysi
2025-11-03 16:01:44 +05:30
parent de4328175d
commit e69107b07c
94 changed files with 9748 additions and 1565 deletions

View File

@@ -13,11 +13,17 @@ from .keyword_analyzer import KeywordAnalyzer
from .competitor_analyzer import CompetitorAnalyzer
from .content_angle_generator import ContentAngleGenerator
from .data_filter import ResearchDataFilter
from .base_provider import ResearchProvider as BaseResearchProvider
from .google_provider import GoogleResearchProvider
from .exa_provider import ExaResearchProvider
__all__ = [
'ResearchService',
'KeywordAnalyzer',
'CompetitorAnalyzer',
'ContentAngleGenerator',
'ResearchDataFilter'
'ResearchDataFilter',
'BaseResearchProvider',
'GoogleResearchProvider',
'ExaResearchProvider',
]

View File

@@ -0,0 +1,37 @@
"""
Base Research Provider Interface
Abstract base class for research provider implementations.
Ensures consistency across different research providers (Google, Exa, etc.)
"""
from abc import ABC, abstractmethod
from typing import Dict, Any
class ResearchProvider(ABC):
"""Abstract base class for research providers."""
@abstractmethod
async def search(
self,
prompt: str,
topic: str,
industry: str,
target_audience: str,
config: Any, # ResearchConfig
user_id: str
) -> Dict[str, Any]:
"""Execute research and return raw results."""
pass
@abstractmethod
def get_provider_enum(self):
"""Return APIProvider enum for subscription tracking."""
pass
@abstractmethod
def estimate_tokens(self) -> int:
"""Estimate token usage for pre-flight validation."""
pass

View File

@@ -0,0 +1,188 @@
"""
Exa Research Provider
Neural search implementation using Exa API for high-quality, citation-rich research.
"""
from exa_py import Exa
import os
from loguru import logger
from models.subscription_models import APIProvider
from .base_provider import ResearchProvider as BaseProvider
class ExaResearchProvider(BaseProvider):
"""Exa neural search provider."""
def __init__(self):
self.api_key = os.getenv("EXA_API_KEY")
if not self.api_key:
raise RuntimeError("EXA_API_KEY not configured")
self.exa = Exa(self.api_key)
logger.info("✅ Exa Research Provider initialized")
async def search(self, prompt, topic, industry, target_audience, config, user_id):
"""Execute Exa neural search and return standardized results."""
# Build Exa query
query = f"{topic} {industry} {target_audience}"
# Map source types to Exa categories
category = self._map_source_type_to_category(config.source_types)
logger.info(f"[Exa Research] Executing search: {query}")
# Execute Exa search
results = self.exa.search_and_contents(
query,
type="auto",
category=category,
num_results=min(config.max_sources, 25),
contents={
'text': {'max_characters': 1000},
'summary': {'query': f"Key insights about {topic}"},
'highlights': {
'num_sentences': 2,
'highlights_per_url': 3
}
}
)
# Transform to standardized format
sources = self._transform_sources(results.results)
content = self._aggregate_content(results.results)
search_type = getattr(results, 'resolvedSearchType', 'neural') if hasattr(results, 'resolvedSearchType') else 'neural'
# Get cost if available
cost = 0.005 # Default Exa cost for 1-25 results
if hasattr(results, 'costDollars'):
if hasattr(results.costDollars, 'total'):
cost = results.costDollars.total
logger.info(f"[Exa Research] Search completed: {len(sources)} sources, type: {search_type}")
return {
'sources': sources,
'content': content,
'search_type': search_type,
'provider': 'exa',
'search_queries': [query],
'cost': {'total': cost}
}
def get_provider_enum(self):
"""Return EXA provider enum for subscription tracking."""
return APIProvider.EXA
def estimate_tokens(self) -> int:
"""Estimate token usage for Exa (not token-based)."""
return 0 # Exa is per-search, not token-based
def _map_source_type_to_category(self, source_types):
"""Map SourceType enum to Exa category parameter."""
if not source_types:
return None
category_map = {
'research paper': 'research paper',
'news': 'news',
'web': 'personal site',
'industry': 'company',
'expert': 'linkedin profile'
}
for st in source_types:
if st.value in category_map:
return category_map[st.value]
return None
def _transform_sources(self, results):
"""Transform Exa results to ResearchSource format."""
sources = []
for idx, result in enumerate(results):
source_type = self._determine_source_type(result.url if hasattr(result, 'url') else '')
sources.append({
'title': result.title if hasattr(result, 'title') else '',
'url': result.url if hasattr(result, 'url') else '',
'excerpt': self._get_excerpt(result),
'credibility_score': 0.85, # Exa results are high quality
'published_at': result.publishedDate if hasattr(result, 'publishedDate') else None,
'index': idx,
'source_type': source_type,
'content': result.text if hasattr(result, 'text') else '',
'highlights': result.highlights if hasattr(result, 'highlights') else [],
'summary': result.summary if hasattr(result, 'summary') else ''
})
return sources
def _get_excerpt(self, result):
"""Extract excerpt from Exa result."""
if hasattr(result, 'text') and result.text:
return result.text[:500]
elif hasattr(result, 'summary') and result.summary:
return result.summary
return ''
def _determine_source_type(self, url):
"""Determine source type from URL."""
if not url:
return 'web'
url_lower = url.lower()
if 'arxiv.org' in url_lower or 'research' in url_lower:
return 'academic'
elif any(news in url_lower for news in ['cnn.com', 'bbc.com', 'reuters.com', 'theguardian.com']):
return 'news'
elif 'linkedin.com' in url_lower:
return 'expert'
else:
return 'web'
def _aggregate_content(self, results):
"""Aggregate content from Exa results for LLM analysis."""
content_parts = []
for idx, result in enumerate(results):
if hasattr(result, 'summary') and result.summary:
content_parts.append(f"Source {idx + 1}: {result.summary}")
elif hasattr(result, 'text') and result.text:
content_parts.append(f"Source {idx + 1}: {result.text[:1000]}")
return "\n\n".join(content_parts)
def track_exa_usage(self, user_id: str, cost: float):
"""Track Exa API usage after successful call."""
from services.database import get_db
from services.subscription import PricingService
from sqlalchemy import text
db = next(get_db())
try:
pricing_service = PricingService(db)
current_period = pricing_service.get_current_billing_period(user_id)
# Update exa_calls and exa_cost via SQL UPDATE
update_query = text("""
UPDATE usage_summaries
SET exa_calls = COALESCE(exa_calls, 0) + 1,
exa_cost = COALESCE(exa_cost, 0) + :cost,
total_calls = total_calls + 1,
total_cost = total_cost + :cost
WHERE user_id = :user_id AND billing_period = :period
""")
db.execute(update_query, {
'cost': cost,
'user_id': user_id,
'period': current_period
})
db.commit()
logger.info(f"[Exa] Tracked usage: user={user_id}, cost=${cost}")
except Exception as e:
logger.error(f"[Exa] Failed to track usage: {e}")
db.rollback()
finally:
db.close()

View File

@@ -0,0 +1,40 @@
"""
Google Research Provider
Wrapper for Gemini native Google Search grounding to match base provider interface.
"""
from services.llm_providers.gemini_grounded_provider import GeminiGroundedProvider
from models.subscription_models import APIProvider
from .base_provider import ResearchProvider as BaseProvider
from loguru import logger
class GoogleResearchProvider(BaseProvider):
"""Google research provider using Gemini native grounding."""
def __init__(self):
self.gemini = GeminiGroundedProvider()
async def search(self, prompt, topic, industry, target_audience, config, user_id):
"""Call Gemini grounding with pre-flight validation."""
logger.info(f"[Google Research] Executing search for topic: {topic}")
result = await self.gemini.generate_grounded_content(
prompt=prompt,
content_type="research",
max_tokens=2000,
user_id=user_id,
validate_subsequent_operations=True
)
return result
def get_provider_enum(self):
"""Return GEMINI provider enum for subscription tracking."""
return APIProvider.GEMINI
def estimate_tokens(self) -> int:
"""Estimate token usage for Google grounding."""
return 1200 # Conservative estimate

View File

@@ -16,6 +16,9 @@ from models.blog_models import (
GroundingChunk,
GroundingSupport,
Citation,
ResearchConfig,
ResearchMode,
ResearchProvider,
)
from services.blog_writer.logger_config import blog_writer_logger, log_function_call
from fastapi import HTTPException
@@ -24,6 +27,7 @@ from .keyword_analyzer import KeywordAnalyzer
from .competitor_analyzer import CompetitorAnalyzer
from .content_angle_generator import ContentAngleGenerator
from .data_filter import ResearchDataFilter
from .research_strategies import get_strategy_for_mode
class ResearchService:
@@ -44,7 +48,6 @@ class ResearchService:
Includes intelligent caching for exact keyword matches.
"""
try:
from services.llm_providers.gemini_grounded_provider import GeminiGroundedProvider
from services.cache.research_cache import research_cache
topic = request.topic or ", ".join(request.keywords)
@@ -79,62 +82,104 @@ class ResearchService:
# Cache miss - proceed with API call
logger.info(f"Cache miss - making API call for keywords: {request.keywords}")
blog_writer_logger.log_operation_start("gemini_api_call", api_name="gemini_grounded", operation="research")
gemini = GeminiGroundedProvider()
blog_writer_logger.log_operation_start("research_api_call", api_name="research", operation="research")
# Single comprehensive research prompt - Gemini handles Google Search automatically
research_prompt = f"""
Research the topic "{topic}" in the {industry} industry for {target_audience} audience. Provide a comprehensive analysis including:
1. Current trends and insights (2024-2025)
2. Key statistics and data points with sources
3. Industry expert opinions and quotes
4. Recent developments and news
5. Market analysis and forecasts
6. Best practices and case studies
7. Keyword analysis: primary, secondary, and long-tail opportunities
8. Competitor analysis: top players and content gaps
9. Content angle suggestions: 5 compelling angles for blog posts
Focus on factual, up-to-date information from credible sources.
Include specific data points, percentages, and recent developments.
Structure your response with clear sections for each analysis area.
"""
# Determine research mode and get appropriate strategy
research_mode = request.research_mode or ResearchMode.BASIC
config = request.config or ResearchConfig(mode=research_mode, provider=ResearchProvider.GOOGLE)
strategy = get_strategy_for_mode(research_mode)
# Single Gemini call with native Google Search grounding - no fallbacks
# Validation is handled inside generate_grounded_content when validate_subsequent_operations=True
import time
api_start_time = time.time()
gemini_result = await gemini.generate_grounded_content(
prompt=research_prompt,
content_type="research",
max_tokens=2000,
user_id=user_id,
validate_subsequent_operations=True # Validates Google Grounding + 3 LLM calls
)
api_duration_ms = (time.time() - api_start_time) * 1000
logger.info(f"Research: mode={research_mode.value}, provider={config.provider.value}")
# Log API call performance
blog_writer_logger.log_api_call(
"gemini_grounded",
"generate_grounded_content",
api_duration_ms,
token_usage=gemini_result.get("token_usage", {}),
content_length=len(gemini_result.get("content", ""))
)
# Build research prompt based on strategy
research_prompt = strategy.build_research_prompt(topic, industry, target_audience, config)
# Extract sources from grounding metadata
sources = self._extract_sources_from_grounding(gemini_result)
# Route to appropriate provider
if config.provider == ResearchProvider.EXA:
# Exa research workflow
from .exa_provider import ExaResearchProvider
from services.subscription.preflight_validator import validate_exa_research_operations
from services.database import get_db
from services.subscription import PricingService
import os
import time
# Pre-flight validation
db_val = next(get_db())
try:
pricing_service = PricingService(db_val)
gpt_provider = os.getenv("GPT_PROVIDER", "google")
validate_exa_research_operations(pricing_service, user_id, gpt_provider)
finally:
db_val.close()
# Execute Exa search
api_start_time = time.time()
try:
exa_provider = ExaResearchProvider()
raw_result = await exa_provider.search(
research_prompt, topic, industry, target_audience, config, user_id
)
api_duration_ms = (time.time() - api_start_time) * 1000
# Track usage
cost = raw_result.get('cost', {}).get('total', 0.005) if isinstance(raw_result.get('cost'), dict) else 0.005
exa_provider.track_exa_usage(user_id, cost)
# Log API call performance
blog_writer_logger.log_api_call(
"exa_search",
"search_and_contents",
api_duration_ms,
token_usage={},
content_length=len(raw_result.get('content', ''))
)
# Extract content for downstream analysis
content = raw_result.get('content', '')
sources = raw_result.get('sources', [])
search_widget = "" # Exa doesn't provide search widgets
search_queries = raw_result.get('search_queries', [])
grounding_metadata = None # Exa doesn't provide grounding metadata
except RuntimeError as e:
if "EXA_API_KEY not configured" in str(e):
logger.warning("Exa not configured, falling back to Google")
config.provider = ResearchProvider.GOOGLE
# Continue to Google flow below
raw_result = None
else:
raise
if config.provider != ResearchProvider.EXA:
# Google research (existing flow) or fallback from Exa
from .google_provider import GoogleResearchProvider
import time
api_start_time = time.time()
google_provider = GoogleResearchProvider()
gemini_result = await google_provider.search(
research_prompt, topic, industry, target_audience, config, user_id
)
api_duration_ms = (time.time() - api_start_time) * 1000
# Log API call performance
blog_writer_logger.log_api_call(
"gemini_grounded",
"generate_grounded_content",
api_duration_ms,
token_usage=gemini_result.get("token_usage", {}),
content_length=len(gemini_result.get("content", ""))
)
# Extract sources and content
sources = self._extract_sources_from_grounding(gemini_result)
content = gemini_result.get("content", "")
search_widget = gemini_result.get("search_widget", "") or ""
search_queries = gemini_result.get("search_queries", []) or []
grounding_metadata = self._extract_grounding_metadata(gemini_result)
# Extract grounding metadata for detailed UI display
grounding_metadata = self._extract_grounding_metadata(gemini_result)
# Extract search widget and queries for UI display
search_widget = gemini_result.get("search_widget", "") or ""
search_queries = gemini_result.get("search_queries", []) or []
# Parse the comprehensive response for different analysis components
content = gemini_result.get("content", "")
# Continue with common analysis (same for both providers)
keyword_analysis = self.keyword_analyzer.analyze(content, request.keywords, user_id=user_id)
competitor_analysis = self.competitor_analyzer.analyze(content, user_id=user_id)
suggested_angles = self.content_angle_generator.generate(content, topic, industry, user_id=user_id)
@@ -261,7 +306,6 @@ class ResearchService:
Research method with progress updates for real-time feedback.
"""
try:
from services.llm_providers.gemini_grounded_provider import GeminiGroundedProvider
from services.cache.research_cache import research_cache
from services.cache.persistent_research_cache import persistent_research_cache
from api.blog_writer.task_manager import task_manager
@@ -293,66 +337,100 @@ class ResearchService:
logger.info(f"Returning cached research result for keywords: {request.keywords}")
return BlogResearchResponse(**cached_result)
# User ID validation (validation logic is now in Google Grounding provider)
# User ID validation
if not user_id:
await task_manager.update_progress(task_id, "❌ Error: User ID is required for research operation")
raise ValueError("user_id is required for research operation. Please provide Clerk user ID.")
# Cache miss - proceed with API call
await task_manager.update_progress(task_id, "🌐 Cache miss - connecting to Google Search grounding...")
logger.info(f"Cache miss - making API call for keywords: {request.keywords}")
gemini = GeminiGroundedProvider()
# Single comprehensive research prompt - Gemini handles Google Search automatically
research_prompt = f"""
Research the topic "{topic}" in the {industry} industry for {target_audience} audience. Provide a comprehensive analysis including:
1. Current trends and insights (2024-2025)
2. Key statistics and data points with sources
3. Industry expert opinions and quotes
4. Recent developments and news
5. Market analysis and forecasts
6. Best practices and case studies
7. Keyword analysis: primary, secondary, and long-tail opportunities
8. Competitor analysis: top players and content gaps
9. Content angle suggestions: 5 compelling angles for blog posts
Focus on factual, up-to-date information from credible sources.
Include specific data points, percentages, and recent developments.
Structure your response with clear sections for each analysis area.
"""
# Determine research mode and get appropriate strategy
research_mode = request.research_mode or ResearchMode.BASIC
config = request.config or ResearchConfig(mode=research_mode, provider=ResearchProvider.GOOGLE)
strategy = get_strategy_for_mode(research_mode)
await task_manager.update_progress(task_id, "🤖 Making AI request to Gemini with Google Search grounding...")
# Single Gemini call with native Google Search grounding - no fallbacks
# Validation is handled inside generate_grounded_content when validate_subsequent_operations=True
try:
gemini_result = await gemini.generate_grounded_content(
prompt=research_prompt,
content_type="research",
max_tokens=2000,
user_id=user_id,
validate_subsequent_operations=True # Validates Google Grounding + 3 LLM calls
)
except HTTPException as http_error:
# Re-raise HTTPException so it can be properly handled by task manager
logger.error(f"Subscription limit exceeded for research: {http_error.detail}")
await task_manager.update_progress(task_id, f"❌ Subscription limit exceeded: {http_error.detail.get('message', str(http_error.detail)) if isinstance(http_error.detail, dict) else str(http_error.detail)}")
raise # Re-raise HTTPException to preserve status code and error details
logger.info(f"Research: mode={research_mode.value}, provider={config.provider.value}")
await task_manager.update_progress(task_id, "📊 Processing research results and extracting insights...")
# Extract sources from grounding metadata
sources = self._extract_sources_from_grounding(gemini_result)
# Build research prompt based on strategy
research_prompt = strategy.build_research_prompt(topic, industry, target_audience, config)
# Extract grounding metadata for detailed UI display
grounding_metadata = self._extract_grounding_metadata(gemini_result)
# Extract search widget and queries for UI display
search_widget = gemini_result.get("search_widget", "") or ""
search_queries = gemini_result.get("search_queries", []) or []
# Route to appropriate provider
if config.provider == ResearchProvider.EXA:
# Exa research workflow
from .exa_provider import ExaResearchProvider
from services.subscription.preflight_validator import validate_exa_research_operations
from services.database import get_db
from services.subscription import PricingService
import os
await task_manager.update_progress(task_id, "🌐 Connecting to Exa neural search...")
# Pre-flight validation
db_val = next(get_db())
try:
pricing_service = PricingService(db_val)
gpt_provider = os.getenv("GPT_PROVIDER", "google")
validate_exa_research_operations(pricing_service, user_id, gpt_provider)
except HTTPException as http_error:
logger.error(f"Subscription limit exceeded for Exa research: {http_error.detail}")
await task_manager.update_progress(task_id, f"❌ Subscription limit exceeded: {http_error.detail.get('message', str(http_error.detail)) if isinstance(http_error.detail, dict) else str(http_error.detail)}")
raise
finally:
db_val.close()
# Execute Exa search
await task_manager.update_progress(task_id, "🤖 Executing Exa neural search...")
try:
exa_provider = ExaResearchProvider()
raw_result = await exa_provider.search(
research_prompt, topic, industry, target_audience, config, user_id
)
# Track usage
cost = raw_result.get('cost', {}).get('total', 0.005) if isinstance(raw_result.get('cost'), dict) else 0.005
exa_provider.track_exa_usage(user_id, cost)
# Extract content for downstream analysis
content = raw_result.get('content', '')
sources = raw_result.get('sources', [])
search_widget = "" # Exa doesn't provide search widgets
search_queries = raw_result.get('search_queries', [])
grounding_metadata = None # Exa doesn't provide grounding metadata
except RuntimeError as e:
if "EXA_API_KEY not configured" in str(e):
logger.warning("Exa not configured, falling back to Google")
await task_manager.update_progress(task_id, "⚠️ Exa not configured, falling back to Google Search")
config.provider = ResearchProvider.GOOGLE
# Continue to Google flow below
else:
raise
if config.provider != ResearchProvider.EXA:
# Google research (existing flow)
from .google_provider import GoogleResearchProvider
await task_manager.update_progress(task_id, "🌐 Connecting to Google Search grounding...")
google_provider = GoogleResearchProvider()
await task_manager.update_progress(task_id, "🤖 Making AI request to Gemini with Google Search grounding...")
try:
gemini_result = await google_provider.search(
research_prompt, topic, industry, target_audience, config, user_id
)
except HTTPException as http_error:
logger.error(f"Subscription limit exceeded for Google research: {http_error.detail}")
await task_manager.update_progress(task_id, f"❌ Subscription limit exceeded: {http_error.detail.get('message', str(http_error.detail)) if isinstance(http_error.detail, dict) else str(http_error.detail)}")
raise
await task_manager.update_progress(task_id, "📊 Processing research results and extracting insights...")
# Extract sources and content
sources = self._extract_sources_from_grounding(gemini_result)
content = gemini_result.get("content", "")
search_widget = gemini_result.get("search_widget", "") or ""
search_queries = gemini_result.get("search_queries", []) or []
grounding_metadata = self._extract_grounding_metadata(gemini_result)
# Continue with common analysis (same for both providers)
await task_manager.update_progress(task_id, "🔍 Analyzing keywords and content angles...")
# Parse the comprehensive response for different analysis components
content = gemini_result.get("content", "")
keyword_analysis = self.keyword_analyzer.analyze(content, request.keywords, user_id=user_id)
competitor_analysis = self.competitor_analyzer.analyze(content, user_id=user_id)
suggested_angles = self.content_angle_generator.generate(content, topic, industry, user_id=user_id)

View File

@@ -0,0 +1,234 @@
"""
Research Strategy Pattern Implementation
Different strategies for executing research based on depth and focus.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any
from loguru import logger
from models.blog_models import BlogResearchRequest, ResearchMode, ResearchConfig
from .keyword_analyzer import KeywordAnalyzer
from .competitor_analyzer import CompetitorAnalyzer
from .content_angle_generator import ContentAngleGenerator
class ResearchStrategy(ABC):
"""Base class for research strategies."""
def __init__(self):
self.keyword_analyzer = KeywordAnalyzer()
self.competitor_analyzer = CompetitorAnalyzer()
self.content_angle_generator = ContentAngleGenerator()
@abstractmethod
def build_research_prompt(
self,
topic: str,
industry: str,
target_audience: str,
config: ResearchConfig
) -> str:
"""Build the research prompt for the strategy."""
pass
@abstractmethod
def get_mode(self) -> ResearchMode:
"""Return the research mode this strategy handles."""
pass
class BasicResearchStrategy(ResearchStrategy):
"""Basic research strategy - keyword focused, minimal analysis."""
def get_mode(self) -> ResearchMode:
return ResearchMode.BASIC
def build_research_prompt(
self,
topic: str,
industry: str,
target_audience: str,
config: ResearchConfig
) -> str:
"""Build basic research prompt focused on keywords and quick insights."""
prompt = f"""You are a professional blog content strategist researching for a {industry} blog targeting {target_audience}.
Research Topic: "{topic}"
Provide analysis in this EXACT format:
## CURRENT TRENDS (2024-2025)
- [Trend 1 with specific data and source URL]
- [Trend 2 with specific data and source URL]
- [Trend 3 with specific data and source URL]
## KEY STATISTICS
- [Statistic 1: specific number/percentage with source URL]
- [Statistic 2: specific number/percentage with source URL]
- [Statistic 3: specific number/percentage with source URL]
- [Statistic 4: specific number/percentage with source URL]
- [Statistic 5: specific number/percentage with source URL]
## PRIMARY KEYWORDS
1. "{topic}" (main keyword)
2. [Variation 1]
3. [Variation 2]
## SECONDARY KEYWORDS
[5 related keywords for blog content]
## CONTENT ANGLES (Top 5)
1. [Angle 1: specific unique approach]
2. [Angle 2: specific unique approach]
3. [Angle 3: specific unique approach]
4. [Angle 4: specific unique approach]
5. [Angle 5: specific unique approach]
REQUIREMENTS:
- Cite EVERY claim with authoritative source URLs
- Use 2024-2025 data when available
- Include specific numbers, dates, examples
- Focus on actionable blog insights for {target_audience}"""
return prompt.strip()
class ComprehensiveResearchStrategy(ResearchStrategy):
"""Comprehensive research strategy - full analysis with all components."""
def get_mode(self) -> ResearchMode:
return ResearchMode.COMPREHENSIVE
def build_research_prompt(
self,
topic: str,
industry: str,
target_audience: str,
config: ResearchConfig
) -> str:
"""Build comprehensive research prompt with all analysis components."""
date_filter = f"\nDate Focus: {config.date_range.value.replace('_', ' ')}" if config.date_range else ""
source_filter = f"\nPriority Sources: {', '.join([s.value for s in config.source_types])}" if config.source_types else ""
prompt = f"""You are a senior blog content strategist conducting comprehensive research for a {industry} blog targeting {target_audience}.
Research Topic: "{topic}"{date_filter}{source_filter}
Provide COMPLETE analysis in this EXACT format:
## TRENDS AND INSIGHTS (2024-2025)
[5-7 trends with specific data, numbers, and source URLs]
## KEY STATISTICS
[7-10 statistics with exact numbers, percentages, dates, and source URLs]
## EXPERT OPINIONS
[4-5 expert quotes with full attribution and source URLs]
## RECENT DEVELOPMENTS
[5-7 recent news/developments with dates and source URLs]
## MARKET ANALYSIS
[3-5 market insights with data points and source URLs]
## BEST PRACTICES & CASE STUDIES
[3-5 examples with specific outcomes/metrics and source URLs]
## KEYWORD ANALYSIS
Primary Keywords: [3 main variations]
Secondary Keywords: [7-10 related keywords]
Long-Tail Opportunities: [5-7 specific search phrases]
## COMPETITOR ANALYSIS
Top Competitors: [5 competitors with brief descriptions]
Content Gaps: [5 topics competitors are missing]
Competitive Advantages: [5 unique angles we can own]
## CONTENT ANGLES (Exactly 5)
1. [Unique angle with reasoning and target benefit]
2. [Unique angle with reasoning and target benefit]
3. [Unique angle with reasoning and target benefit]
4. [Unique angle with reasoning and target benefit]
5. [Unique angle with reasoning and target benefit]
VERIFICATION REQUIREMENTS:
- Minimum 2 authoritative sources per major claim
- Prioritize: Industry publications > Research papers > News > Blogs
- 2024-2025 data strongly preferred
- All numbers must include context (timeframe, sample size, methodology)
- Every recommendation must be actionable for {target_audience}"""
return prompt.strip()
class TargetedResearchStrategy(ResearchStrategy):
"""Targeted research strategy - focused on specific aspects."""
def get_mode(self) -> ResearchMode:
return ResearchMode.TARGETED
def build_research_prompt(
self,
topic: str,
industry: str,
target_audience: str,
config: ResearchConfig
) -> str:
"""Build targeted research prompt based on config preferences."""
sections = []
if config.include_trends:
sections.append("""## CURRENT TRENDS
[3-5 trends with data and source URLs]""")
if config.include_statistics:
sections.append("""## KEY STATISTICS
[5-7 statistics with numbers and source URLs]""")
if config.include_expert_quotes:
sections.append("""## EXPERT OPINIONS
[3-4 expert quotes with attribution and source URLs]""")
if config.include_competitors:
sections.append("""## COMPETITOR ANALYSIS
Top Competitors: [3-5]
Content Gaps: [3-5]""")
# Always include keywords and angles
sections.append("""## KEYWORD ANALYSIS
Primary: [2-3 variations]
Secondary: [5-7 keywords]
Long-Tail: [3-5 phrases]""")
sections.append("""## CONTENT ANGLES (3-5)
[Unique blog angles with reasoning]""")
sections_str = "\n\n".join(sections)
prompt = f"""You are a blog content strategist conducting targeted research for a {industry} blog targeting {target_audience}.
Research Topic: "{topic}"
Provide focused analysis in this EXACT format:
{sections_str}
REQUIREMENTS:
- Cite all claims with authoritative source URLs
- Include specific numbers, dates, examples
- Focus on actionable insights for {target_audience}
- Use 2024-2025 data when available"""
return prompt.strip()
def get_strategy_for_mode(mode: ResearchMode) -> ResearchStrategy:
"""Factory function to get the appropriate strategy for a mode."""
strategy_map = {
ResearchMode.BASIC: BasicResearchStrategy,
ResearchMode.COMPREHENSIVE: ComprehensiveResearchStrategy,
ResearchMode.TARGETED: TargetedResearchStrategy,
}
strategy_class = strategy_map.get(mode, BasicResearchStrategy)
return strategy_class()