Content Calendar, Content Gap Analysis, and Content Optimization

This commit is contained in:
ajaysi
2025-05-27 09:15:08 +05:30
parent 4049d19787
commit 889021c078
100 changed files with 18504 additions and 1251 deletions

View File

@@ -0,0 +1,182 @@
# Content Gap Analysis Tool
A comprehensive AI-powered tool for analyzing content gaps and generating strategic content recommendations.
## Overview
The Content Gap Analysis tool combines multiple SEO tools to provide a complete analysis of your content strategy, identify opportunities, and generate actionable recommendations. It leverages existing AI SEO tools and adds new capabilities for comprehensive content analysis.
## Workflow Design
### 1. Website Analysis
**Input:** Website URL
**Tools Integration:**
- `analyze_onpage_seo()`: Analyze content quality and structure
- `url_seo_checker()`: Check technical SEO aspects
- `google_pagespeed_insights()`: Assess page performance
**Analysis Components:**
- Content structure mapping
- Topic categorization
- Content depth assessment
- Performance metrics
### 2. Competitor Analysis
**Input:** Competitor URLs
**Tools Integration:**
- `url_seo_checker()`: Analyze competitor URLs
- `analyze_onpage_seo()`: Compare content quality
- `ai_title_generator()`: Analyze title patterns
**Analysis Components:**
- Content strategy comparison
- Topic coverage gaps
- Content format analysis
- Title pattern analysis
### 3. Keyword Research
**Input:** Industry/Niche
**Tools Integration:**
- `ai_title_generator()`: Generate keyword-based titles
- `metadesc_generator_main()`: Analyze meta descriptions for keyword usage
- `ai_structured_data()`: Check structured data implementation
**Analysis Components:**
- Keyword opportunity identification
- Search intent analysis
- Content format suggestions
- Topic clustering
### 4. AI-Powered Recommendations
**Tools Integration:**
- `ai_title_generator()`: Generate content titles
- `metadesc_generator_main()`: Create content summaries
- `ai_structured_data()`: Suggest structured data implementation
**Output Components:**
- Content topic suggestions
- Format recommendations
- Priority scoring
- Implementation timeline
## Implementation Plan
### Phase 1: Core Infrastructure
1. Create base classes and interfaces
2. Implement data collection modules
3. Set up AI model integration
4. Develop data storage system
### Phase 2: Tool Integration
1. Integrate existing SEO tools
2. Create unified API for tool interaction
3. Implement data sharing between tools
4. Develop result aggregation system
### Phase 3: Analysis Engine
1. Implement content structure analysis
2. Develop competitor analysis algorithms
3. Create keyword research system
4. Build recommendation engine
### Phase 4: UI/UX Development
1. Create step-by-step workflow interface
2. Implement progress tracking
3. Develop visualization components
4. Add export functionality
## Technical Requirements
### Dependencies
- Existing SEO tools from `lib/ai_seo_tools/`
- AI models for content analysis
- Web scraping capabilities
- Data storage system
### File Structure
```
content_gap_analysis/
├── __init__.py
├── main.py
├── website_analyzer.py
├── competitor_analyzer.py
├── keyword_researcher.py
├── recommendation_engine.py
├── utils/
│ ├── __init__.py
│ ├── data_collector.py
│ ├── content_parser.py
│ └── ai_processor.py
└── tests/
├── __init__.py
├── test_website_analyzer.py
├── test_competitor_analyzer.py
└── test_keyword_researcher.py
```
## Integration Points
### Existing Tools
1. **On-Page SEO Analyzer**
- Function: `analyze_onpage_seo()`
- Purpose: Content quality assessment
- Integration: Content structure analysis
2. **URL SEO Checker**
- Function: `url_seo_checker()`
- Purpose: Technical optimization
- Integration: URL structure analysis
3. **Blog Title Generator**
- Function: `ai_title_generator()`
- Purpose: Content ideas
- Integration: Keyword analysis
4. **Meta Description Generator**
- Function: `metadesc_generator_main()`
- Purpose: Content summaries
- Integration: Content optimization
5. **Structured Data Generator**
- Function: `ai_structured_data()`
- Purpose: Rich snippets
- Integration: Content enhancement
### New Components
1. **Content Structure Analyzer**
- Purpose: Map website content structure
- Output: Content hierarchy and relationships
2. **Competitor Content Analyzer**
- Purpose: Analyze competitor content strategy
- Output: Content gaps and opportunities
3. **Keyword Opportunity Finder**
- Purpose: Identify keyword gaps
- Output: Keyword recommendations
4. **AI Recommendation Engine**
- Purpose: Generate content recommendations
- Output: Actionable content strategy
## Future Enhancements
1. **Advanced Analytics**
- Content performance tracking
- ROI analysis
- Trend prediction
2. **Automation Features**
- Automated content planning
- Schedule generation
- Priority scoring
3. **Integration Expansion**
- CMS integration
- Analytics platform connection
- Social media analysis
4. **AI Improvements**
- Advanced topic modeling
- Sentiment analysis
- Content quality scoring

View File

@@ -0,0 +1,36 @@
"""
Content Gap Analysis Tool for Alwrity.
"""
from .ui import ContentGapAnalysisUI
from .main import ContentGapAnalysis
from .keyword_researcher import KeywordResearcher
from .competitor_analyzer import CompetitorAnalyzer
from .website_analyzer import WebsiteAnalyzer
from .recommendation_engine import RecommendationEngine
from .utils.ai_processor import AIProcessor
__all__ = [
'ContentGapAnalysisUI',
'ContentGapAnalysis',
'KeywordResearcher',
'CompetitorAnalyzer',
'WebsiteAnalyzer',
'RecommendationEngine',
'AIProcessor'
]
def run_content_gap_analysis():
"""Run the Content Gap Analysis tool."""
# Initialize the UI with proper configuration
ui = ContentGapAnalysisUI()
# Set up the page configuration
st.set_page_config(
page_title="Content Gap Analysis",
page_icon="📊",
layout="wide"
)
# Run the UI
ui.run()

View File

@@ -0,0 +1,711 @@
"""
Competitor analyzer for content gap analysis.
"""
from typing import Dict, Any, List, Optional
import streamlit as st
from collections import Counter, defaultdict
from loguru import logger
from lib.utils.website_analyzer.analyzer import WebsiteAnalyzer
from lib.ai_seo_tools.content_gap_analysis.utils.data_collector import DataCollector
from lib.ai_seo_tools.content_gap_analysis.utils.content_parser import ContentParser
from lib.ai_seo_tools.content_gap_analysis.utils.ai_processor import AIProcessor, ProgressTracker
import asyncio
import sys
import os
import json
from lib.gpt_providers.text_generation.main_text_generation import llm_text_gen
# Configure logger
logger.remove() # Remove default handler
logger.add(
"logs/competitor_analyzer.log",
rotation="50 MB",
retention="10 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
logger.add(
sys.stdout,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>"
)
# Ensure logs directory exists
os.makedirs("logs", exist_ok=True)
class CompetitorAnalyzer:
"""Analyzes competitor content and market position."""
def __init__(self):
"""Initialize the competitor analyzer."""
self.website_analyzer = WebsiteAnalyzer()
self.ai_processor = AIProcessor()
self.progress = ProgressTracker()
# Define analysis stages
self.stages = {
'competitor_analysis': {
'name': 'Competitor Analysis',
'steps': [
'Initializing competitor analysis',
'Analyzing competitor content',
'Evaluating market position',
'Identifying content gaps',
'Generating competitive insights'
]
}
}
logger.info("CompetitorAnalyzer initialized")
def analyze(self, competitor_urls: List[str], industry: str) -> Dict[str, Any]:
"""
Analyze competitor websites.
Args:
competitor_urls: List of competitor URLs to analyze
industry: Industry category
Returns:
Dictionary containing competitor analysis results
"""
try:
results = {
'competitors': [],
'market_position': {},
'content_gaps': [],
'advantages': []
}
# Analyze each competitor
for url in competitor_urls:
competitor_analysis = self.website_analyzer.analyze_website(url)
if competitor_analysis.get('success', False):
results['competitors'].append({
'url': url,
'analysis': competitor_analysis['data']
})
# Generate market position analysis using AI
prompt = f"""Analyze the market position of competitors in the {industry} industry:
Competitor Analyses:
{json.dumps(results['competitors'], indent=2)}
Provide:
1. Market position analysis
2. Content gaps
3. Competitive advantages
Format the response as JSON with 'market_position', 'content_gaps', and 'advantages' keys."""
# Get AI analysis
analysis = llm_text_gen(
prompt=prompt,
system_prompt="You are an SEO expert specializing in competitive analysis.",
response_format="json_object"
)
if analysis:
results['market_position'] = analysis.get('market_position', {})
results['content_gaps'] = analysis.get('content_gaps', [])
results['advantages'] = analysis.get('advantages', [])
return results
except Exception as e:
error_msg = f"Error analyzing competitors: {str(e)}"
logger.error(error_msg, exc_info=True)
return {
'error': error_msg,
'competitors': [],
'market_position': {},
'content_gaps': [],
'advantages': []
}
def _analyze_competitor_content(self, competitor_urls: List[str]) -> Dict[str, Any]:
"""Analyze competitor content."""
try:
content_analysis = {}
for url in competitor_urls:
# Get AI analysis for each competitor
analysis = self.ai_processor.analyze_content({
'url': url,
'content': {} # Content will be fetched by AI processor
})
content_analysis[url] = {
'content_metrics': analysis.get('content_metrics', {}),
'content_evolution': analysis.get('content_evolution', {}),
'topic_trends': analysis.get('topic_trends', {}),
'performance_trends': analysis.get('performance_trends', {})
}
return content_analysis
except Exception as e:
st.error(f"Error analyzing competitor content: {str(e)}")
return {}
def _evaluate_market_position(self, content_analysis: Dict[str, Any], industry: str) -> Dict[str, Any]:
"""Evaluate market position."""
try:
market_position = {
'industry_rank': 0,
'content_quality_rank': 0,
'market_share': 0,
'competitive_advantages': [],
'competitive_disadvantages': []
}
# Calculate industry rank based on content quality
content_quality_scores = [
analysis.get('content_metrics', {}).get('quality_score', 0)
for analysis in content_analysis.values()
]
if content_quality_scores:
market_position['content_quality_rank'] = sum(content_quality_scores) / len(content_quality_scores)
# Identify competitive advantages and disadvantages
for url, analysis in content_analysis.items():
quality_score = analysis.get('content_metrics', {}).get('quality_score', 0)
if quality_score > market_position['content_quality_rank']:
market_position['competitive_advantages'].append({
'url': url,
'advantage': 'Higher content quality',
'score': quality_score
})
elif quality_score < market_position['content_quality_rank']:
market_position['competitive_disadvantages'].append({
'url': url,
'disadvantage': 'Lower content quality',
'score': quality_score
})
return market_position
except Exception as e:
st.error(f"Error evaluating market position: {str(e)}")
return {}
def _identify_content_gaps(self, content_analysis: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Identify content gaps."""
try:
content_gaps = []
# Analyze content coverage
all_topics = set()
for analysis in content_analysis.values():
topics = analysis.get('topic_trends', {}).get('topics', [])
all_topics.update(topics)
# Identify missing topics for each competitor
for url, analysis in content_analysis.items():
covered_topics = set(analysis.get('topic_trends', {}).get('topics', []))
missing_topics = all_topics - covered_topics
if missing_topics:
content_gaps.append({
'url': url,
'missing_topics': list(missing_topics),
'gap_type': 'topic_coverage'
})
return content_gaps
except Exception as e:
st.error(f"Error identifying content gaps: {str(e)}")
return []
def _generate_competitive_insights(self, content_analysis: Dict[str, Any], market_position: Dict[str, Any], content_gaps: List[Dict[str, Any]]) -> List[str]:
"""Generate competitive insights."""
try:
insights = []
# Market position insights
if market_position.get('content_quality_rank', 0) > 80:
insights.append("Strong market position with high content quality")
elif market_position.get('content_quality_rank', 0) > 60:
insights.append("Moderate market position with room for improvement")
else:
insights.append("Weak market position requiring significant improvement")
# Content gap insights
if content_gaps:
insights.append(f"Identified {len(content_gaps)} content gaps across competitors")
# Competitive advantage insights
if market_position.get('competitive_advantages'):
insights.append(f"Found {len(market_position['competitive_advantages'])} competitive advantages")
return insights
except Exception as e:
st.error(f"Error generating competitive insights: {str(e)}")
return []
def _run_seo_analysis(self, url: str) -> dict:
"""
Run SEO analysis on competitor website.
Args:
url (str): The URL to analyze
Returns:
dict: SEO analysis results
"""
# Run website analysis using the new analyzer
analysis = self.website_analyzer.analyze_website(url)
if not analysis.get('success', False):
return {
'error': analysis.get('error', 'Unknown error in SEO analysis'),
'onpage_seo': {},
'url_seo': {}
}
# Extract SEO information from the analysis
seo_info = analysis['data']['analysis']['seo_info']
basic_info = analysis['data']['analysis']['basic_info']
return {
'onpage_seo': {
'meta_tags': seo_info.get('meta_tags', {}),
'content': seo_info.get('content', {}),
'recommendations': seo_info.get('recommendations', [])
},
'url_seo': {
'title': basic_info.get('title', ''),
'meta_description': basic_info.get('meta_description', ''),
'has_robots_txt': bool(basic_info.get('robots_txt')),
'has_sitemap': bool(basic_info.get('sitemap'))
}
}
def _analyze_title_patterns(self, url: str) -> dict:
"""
Analyze title patterns using the title generator.
Args:
url (str): The URL to analyze
Returns:
dict: Title pattern analysis results
"""
# Use title generator to analyze patterns
title_analysis = ai_title_generator(url)
return {
'patterns': title_analysis.get('patterns', {}),
'suggestions': title_analysis.get('suggestions', [])
}
def _compare_competitors(self, results: dict) -> dict:
"""
Compare results across all competitors.
Args:
results (dict): Analysis results for all competitors
Returns:
dict: Comparative analysis results
"""
comparison = {
'content_comparison': self._compare_content(results),
'seo_comparison': self._compare_seo(results),
'title_comparison': self._compare_titles(results),
'performance_metrics': self._compare_performance(results),
'content_gaps': self._identify_content_gaps(results)
}
# Add AI-enhanced insights
comparison['ai_insights'] = self.ai_processor.analyze_competitor_comparison(comparison)
return comparison
def _compare_content(self, results: dict) -> dict:
"""Compare content structure across competitors."""
content_comparison = {
'topic_distribution': self._analyze_topic_distribution(results),
'content_depth': self._analyze_content_depth(results),
'content_formats': self._analyze_content_formats(results),
'content_quality': self._analyze_content_quality(results)
}
return content_comparison
def _analyze_topic_distribution(self, results: dict) -> dict:
"""Analyze topic distribution across competitors."""
all_topics = []
topic_frequency = Counter()
for url, data in results.items():
topics = data['content_structure'].get('topics', [])
all_topics.extend([t['topic'] for t in topics])
topic_frequency.update([t['topic'] for t in topics])
return {
'common_topics': [topic for topic, count in topic_frequency.most_common(10)],
'unique_topics': list(set(all_topics)),
'topic_frequency': dict(topic_frequency.most_common()),
'topic_coverage': len(set(all_topics)) / len(all_topics) if all_topics else 0
}
def _analyze_content_depth(self, results: dict) -> dict:
"""Analyze content depth across competitors."""
depth_metrics = {
'word_counts': {},
'section_counts': {},
'heading_distribution': defaultdict(list),
'content_hierarchy': {}
}
for url, data in results.items():
content_structure = data['content_structure']
# Word count analysis
depth_metrics['word_counts'][url] = content_structure.get('text_statistics', {}).get('word_count', 0)
# Section analysis
depth_metrics['section_counts'][url] = len(content_structure.get('sections', []))
# Heading distribution
for level, count in content_structure.get('hierarchy', {}).get('heading_distribution', {}).items():
depth_metrics['heading_distribution'][level].append(count)
# Content hierarchy
depth_metrics['content_hierarchy'][url] = content_structure.get('hierarchy', {})
return depth_metrics
def _analyze_content_formats(self, results: dict) -> dict:
"""Analyze content formats across competitors."""
format_analysis = {
'format_types': defaultdict(int),
'format_distribution': defaultdict(list),
'format_effectiveness': {}
}
for url, data in results.items():
sections = data['content_structure'].get('sections', [])
for section in sections:
format_type = section.get('type', 'unknown')
format_analysis['format_types'][format_type] += 1
format_analysis['format_distribution'][format_type].append({
'url': url,
'heading': section.get('heading', ''),
'word_count': section.get('word_count', 0)
})
return format_analysis
def _analyze_content_quality(self, results: dict) -> dict:
"""Analyze content quality across competitors."""
quality_metrics = {
'readability_scores': {},
'content_structure_scores': {},
'engagement_metrics': {},
'overall_quality': {}
}
for url, data in results.items():
content_structure = data['content_structure']
# Readability analysis
readability = content_structure.get('readability', {})
quality_metrics['readability_scores'][url] = {
'flesch_score': readability.get('flesch_score', 0),
'avg_sentence_length': readability.get('avg_sentence_length', 0),
'avg_word_length': readability.get('avg_word_length', 0)
}
# Structure analysis
hierarchy = content_structure.get('hierarchy', {})
quality_metrics['content_structure_scores'][url] = {
'has_proper_hierarchy': hierarchy.get('has_proper_hierarchy', False),
'heading_distribution': hierarchy.get('heading_distribution', {}),
'max_depth': hierarchy.get('max_depth', 0)
}
return quality_metrics
def _compare_seo(self, results: dict) -> dict:
"""Compare SEO metrics across competitors."""
seo_comparison = {
'onpage_metrics': defaultdict(list),
'technical_metrics': defaultdict(list),
'content_metrics': defaultdict(list),
'overall_seo_score': {}
}
for url, data in results.items():
seo_info = data.get('website_analysis', {}).get('analysis', {}).get('seo_info', {})
# On-page SEO metrics
meta_tags = seo_info.get('meta_tags', {})
seo_comparison['onpage_metrics']['title_score'].append(
100 if meta_tags.get('title', {}).get('status') == 'good' else 50
)
seo_comparison['onpage_metrics']['description_score'].append(
100 if meta_tags.get('description', {}).get('status') == 'good' else 50
)
seo_comparison['onpage_metrics']['keywords_score'].append(
100 if meta_tags.get('keywords', {}).get('status') == 'good' else 50
)
# Technical SEO metrics
technical = data.get('website_analysis', {}).get('analysis', {}).get('basic_info', {})
seo_comparison['technical_metrics']['has_robots_txt'].append(
100 if technical.get('robots_txt') else 0
)
seo_comparison['technical_metrics']['has_sitemap'].append(
100 if technical.get('sitemap') else 0
)
# Content SEO metrics
content = seo_info.get('content', {})
seo_comparison['content_metrics']['readability_score'].append(
content.get('readability_score', 0)
)
seo_comparison['content_metrics']['content_quality_score'].append(
content.get('content_quality_score', 0)
)
# Overall SEO score
seo_comparison['overall_seo_score'][url] = seo_info.get('overall_score', 0)
return seo_comparison
def _compare_titles(self, results: dict) -> dict:
"""Compare title patterns across competitors."""
title_comparison = {
'pattern_distribution': defaultdict(int),
'length_distribution': defaultdict(list),
'keyword_usage': defaultdict(int),
'format_preferences': defaultdict(int)
}
for url, data in results.items():
title_patterns = data['title_patterns']
# Pattern analysis
for pattern in title_patterns.get('patterns', {}):
title_comparison['pattern_distribution'][pattern] += 1
# Length analysis
for suggestion in title_patterns.get('suggestions', []):
title_comparison['length_distribution'][len(suggestion)].append(suggestion)
# Keyword analysis
for suggestion in title_patterns.get('suggestions', []):
words = suggestion.lower().split()
for word in words:
if len(word) > 3: # Filter out short words
title_comparison['keyword_usage'][word] += 1
return title_comparison
def _compare_performance(self, results: dict) -> dict:
"""Compare performance metrics across competitors."""
performance_metrics = {
'content_effectiveness': {},
'engagement_metrics': {},
'technical_performance': {},
'overall_performance': {}
}
for url, data in results.items():
# Content effectiveness
content_structure = data['content_structure']
performance_metrics['content_effectiveness'][url] = {
'content_depth': content_structure.get('text_statistics', {}).get('word_count', 0),
'content_quality': content_structure.get('readability', {}).get('flesch_score', 0),
'content_structure': content_structure.get('hierarchy', {}).get('has_proper_hierarchy', False)
}
# Technical performance
seo_analysis = data['seo_analysis']
performance_metrics['technical_performance'][url] = {
'onpage_score': sum(1 for v in seo_analysis.get('onpage_seo', {}).values() if v),
'technical_score': sum(1 for v in seo_analysis.get('url_seo', {}).values() if v)
}
return performance_metrics
def _find_missing_topics(self, results: dict) -> List[Dict[str, Any]]:
"""Find topics that are missing or underrepresented."""
all_topics = set()
topic_coverage = defaultdict(int)
# Collect all topics and their coverage
for url, data in results.items():
topics = data['content_structure'].get('topics', [])
for topic in topics:
all_topics.add(topic['topic'])
topic_coverage[topic['topic']] += 1
# Identify missing or underrepresented topics
missing_topics = []
total_competitors = len(results)
for topic in all_topics:
coverage = topic_coverage[topic] / total_competitors
if coverage < 0.5: # Topic covered by less than 50% of competitors
missing_topics.append({
'topic': topic,
'coverage': coverage,
'opportunity_score': 1 - coverage
})
return sorted(missing_topics, key=lambda x: x['opportunity_score'], reverse=True)
def _identify_opportunities(self, results: dict) -> List[Dict[str, Any]]:
"""Identify content opportunities based on analysis."""
opportunities = []
# Analyze content depth opportunities
depth_metrics = self._analyze_content_depth(results)
avg_word_count = sum(depth_metrics['word_counts'].values()) / len(depth_metrics['word_counts'])
for url, word_count in depth_metrics['word_counts'].items():
if word_count < avg_word_count * 0.7: # Content depth significantly below average
opportunities.append({
'type': 'content_depth',
'url': url,
'current_value': word_count,
'target_value': avg_word_count,
'opportunity_score': (avg_word_count - word_count) / avg_word_count
})
# Analyze format opportunities
format_analysis = self._analyze_content_formats(results)
for format_type, distribution in format_analysis['format_distribution'].items():
if len(distribution) < len(results) * 0.3: # Format used by less than 30% of competitors
opportunities.append({
'type': 'content_format',
'format': format_type,
'current_coverage': len(distribution) / len(results),
'opportunity_score': 1 - (len(distribution) / len(results))
})
return sorted(opportunities, key=lambda x: x['opportunity_score'], reverse=True)
def _analyze_format_gaps(self, results: dict) -> List[Dict[str, Any]]:
"""Analyze gaps in content formats."""
format_gaps = []
format_analysis = self._analyze_content_formats(results)
# Identify underutilized formats
for format_type, count in format_analysis['format_types'].items():
if count < len(results) * 0.3: # Format used by less than 30% of competitors
format_gaps.append({
'format': format_type,
'current_usage': count,
'potential_impact': 'high' if count < len(results) * 0.2 else 'medium',
'suggested_implementation': self._generate_format_suggestions(format_type)
})
return format_gaps
def _analyze_quality_gaps(self, results: dict) -> List[Dict[str, Any]]:
"""Analyze gaps in content quality."""
quality_gaps = []
quality_metrics = self._analyze_content_quality(results)
# Analyze readability gaps
readability_scores = quality_metrics['readability_scores']
avg_flesch = sum(score['flesch_score'] for score in readability_scores.values()) / len(readability_scores)
for url, scores in readability_scores.items():
if scores['flesch_score'] < avg_flesch * 0.8: # Readability significantly below average
quality_gaps.append({
'type': 'readability',
'url': url,
'current_score': scores['flesch_score'],
'target_score': avg_flesch,
'improvement_needed': avg_flesch - scores['flesch_score']
})
return quality_gaps
def _analyze_seo_gaps(self, results: dict) -> List[Dict[str, Any]]:
"""Analyze gaps in SEO implementation."""
seo_gaps = []
seo_comparison = self._compare_seo(results)
# Analyze on-page SEO gaps
for metric, values in seo_comparison['onpage_metrics'].items():
avg_value = sum(values) / len(values)
for url, value in zip(results.keys(), values):
if value < avg_value * 0.7: # Significantly below average
seo_gaps.append({
'type': 'onpage_seo',
'metric': metric,
'url': url,
'current_value': value,
'target_value': avg_value,
'improvement_needed': avg_value - value
})
# Analyze technical SEO gaps
for metric, values in seo_comparison['technical_metrics'].items():
avg_value = sum(values) / len(values)
for url, value in zip(results.keys(), values):
if value < avg_value * 0.7: # Significantly below average
seo_gaps.append({
'type': 'technical_seo',
'metric': metric,
'url': url,
'current_value': value,
'target_value': avg_value,
'improvement_needed': avg_value - value
})
# Analyze content SEO gaps
for metric, values in seo_comparison['content_metrics'].items():
avg_value = sum(values) / len(values)
for url, value in zip(results.keys(), values):
if value < avg_value * 0.7: # Significantly below average
seo_gaps.append({
'type': 'content_seo',
'metric': metric,
'url': url,
'current_value': value,
'target_value': avg_value,
'improvement_needed': avg_value - value
})
return seo_gaps
def _generate_format_suggestions(self, format_type: str) -> List[str]:
"""Generate suggestions for implementing specific content formats."""
format_suggestions = {
'article': [
'Create in-depth articles with comprehensive coverage',
'Include expert quotes and statistics',
'Add visual elements and infographics'
],
'blog_post': [
'Write engaging blog posts with personal insights',
'Include call-to-actions',
'Add social sharing buttons'
],
'how-to': [
'Create step-by-step guides',
'Include screenshots or videos',
'Add troubleshooting sections'
],
'case_study': [
'Present real-world examples',
'Include metrics and results',
'Add client testimonials'
]
}
return format_suggestions.get(format_type, [
'Research successful examples',
'Analyze competitor implementation',
'Create unique value proposition'
])

View File

@@ -0,0 +1,649 @@
"""
Keyword researcher for content gap analysis.
"""
from typing import Dict, Any, List, Optional
import streamlit as st
from loguru import logger
from lib.utils.website_analyzer.analyzer import WebsiteAnalyzer
from lib.ai_seo_tools.content_gap_analysis.utils.data_collector import DataCollector
from lib.ai_seo_tools.content_gap_analysis.utils.content_parser import ContentParser
from lib.ai_seo_tools.content_gap_analysis.utils.ai_processor import AIProcessor, ProgressTracker
import asyncio
import sys
import os
import json
from lib.gpt_providers.text_generation.main_text_generation import llm_text_gen
from lib.ai_seo_tools.content_title_generator import ai_title_generator
from lib.ai_seo_tools.meta_desc_generator import metadesc_generator_main
from lib.ai_seo_tools.seo_structured_data import ai_structured_data
# Configure logger
logger.remove() # Remove default handler
logger.add(
"logs/keyword_researcher.log",
rotation="50 MB",
retention="10 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
logger.add(
sys.stdout,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>"
)
# Ensure logs directory exists
os.makedirs("logs", exist_ok=True)
class KeywordResearcher:
"""Researches and analyzes keywords for content strategy."""
def __init__(self):
"""Initialize the keyword researcher."""
self.ai_processor = AIProcessor()
self.progress = ProgressTracker()
# Define analysis stages
self.stages = {
'keyword_analysis': {
'name': 'Keyword Analysis',
'steps': [
'Initializing keyword research',
'Analyzing keyword trends',
'Evaluating search intent',
'Identifying opportunities',
'Generating keyword insights'
]
}
}
def analyze(self, industry: str, url: str) -> Dict[str, Any]:
"""
Analyze keywords for content strategy.
Args:
industry: Industry category
url: Target website URL
Returns:
Dictionary containing analysis results
"""
try:
self.progress.start_stage('keyword_analysis')
self.progress.next_step()
# Analyze keyword trends
trend_analysis = self._analyze_keyword_trends(industry)
self.progress.next_step()
# Evaluate search intent
intent_analysis = self._evaluate_search_intent(trend_analysis)
self.progress.next_step()
# Identify opportunities
opportunities = self._identify_opportunities(trend_analysis, intent_analysis)
self.progress.next_step()
# Generate insights
insights = self._generate_keyword_insights(trend_analysis, intent_analysis, opportunities)
self.progress.next_step()
self.progress.complete_stage()
return {
'trend_analysis': trend_analysis,
'intent_analysis': intent_analysis,
'opportunities': opportunities,
'insights': insights
}
except Exception as e:
if self.progress.current_stage:
self.progress.update_progress(0, f"Error in {self.progress.stages[self.progress.current_stage]['name']}: {str(e)}")
st.error(f"Error analyzing keywords: {str(e)}")
return {
'error': str(e),
'trend_analysis': {},
'intent_analysis': {},
'opportunities': [],
'insights': []
}
def _analyze_keyword_trends(self, industry: str) -> Dict[str, Any]:
"""Analyze keyword trends."""
try:
# Get AI analysis for keyword trends
analysis = self.ai_processor.analyze_keywords({
'industry': industry,
'keywords': {} # Keywords will be fetched by AI processor
})
return {
'trends': analysis.get('keyword_trends', {}),
'search_intent': analysis.get('search_intent', {}),
'keyword_insights': analysis.get('keyword_insights', {})
}
except Exception as e:
st.error(f"Error analyzing keyword trends: {str(e)}")
return {}
def _evaluate_search_intent(self, trend_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Evaluate search intent."""
try:
intent_analysis = {
'informational': [],
'transactional': [],
'navigational': [],
'commercial': []
}
# Categorize keywords by intent
for keyword, data in trend_analysis.get('trends', {}).items():
intent = data.get('intent', 'informational')
if intent in intent_analysis:
intent_analysis[intent].append({
'keyword': keyword,
'volume': data.get('volume', 0),
'difficulty': data.get('difficulty', 0)
})
return intent_analysis
except Exception as e:
st.error(f"Error evaluating search intent: {str(e)}")
return {}
def _identify_opportunities(self, trend_analysis: Dict[str, Any], intent_analysis: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Identify keyword opportunities."""
try:
opportunities = []
# Analyze each intent category
for intent, keywords in intent_analysis.items():
for keyword_data in keywords:
# Calculate opportunity score
volume = keyword_data.get('volume', 0)
difficulty = keyword_data.get('difficulty', 0)
opportunity_score = volume * (1 - difficulty/100)
if opportunity_score > 50: # Threshold for good opportunities
opportunities.append({
'keyword': keyword_data['keyword'],
'intent': intent,
'volume': volume,
'difficulty': difficulty,
'opportunity_score': opportunity_score
})
# Sort by opportunity score
opportunities.sort(key=lambda x: x['opportunity_score'], reverse=True)
return opportunities
except Exception as e:
st.error(f"Error identifying opportunities: {str(e)}")
return []
def _generate_keyword_insights(self, trend_analysis: Dict[str, Any], intent_analysis: Dict[str, Any], opportunities: List[Dict[str, Any]]) -> List[str]:
"""Generate keyword insights."""
try:
insights = []
# Trend insights
if trend_analysis.get('trends'):
insights.append(f"Analyzed {len(trend_analysis['trends'])} keywords for trends")
# Intent insights
for intent, keywords in intent_analysis.items():
if keywords:
insights.append(f"Found {len(keywords)} {intent} keywords")
# Opportunity insights
if opportunities:
insights.append(f"Identified {len(opportunities)} high-potential keyword opportunities")
return insights
except Exception as e:
st.error(f"Error generating keyword insights: {str(e)}")
return []
def _generate_titles(self, industry: str) -> dict:
"""
Generate keyword-based titles using the title generator.
Args:
industry (str): The industry to generate titles for
Returns:
dict: Generated titles and patterns
"""
return ai_title_generator(industry)
def _analyze_meta_descriptions(self, industry: str) -> dict:
"""
Analyze meta descriptions for keyword usage.
Args:
industry (str): The industry to analyze
Returns:
dict: Meta description analysis results
"""
return metadesc_generator_main(industry)
def _analyze_structured_data(self, industry: str) -> dict:
"""
Analyze structured data implementation.
Args:
industry (str): The industry to analyze
Returns:
dict: Structured data analysis results
"""
return ai_structured_data(industry)
def _extract_keywords(self, titles: dict, meta_analysis: dict) -> list:
"""
Extract keywords from titles and meta descriptions.
Args:
titles (dict): Generated titles
meta_analysis (dict): Meta description analysis
Returns:
list: Extracted keywords with metrics
"""
prompt = f"""
As an SEO expert, analyze the following content and extract relevant keywords with their metrics:
Titles: {titles}
Meta Descriptions: {meta_analysis}
Please provide a JSON response with the following structure:
{{
"keywords": [
{{
"keyword": "string",
"search_volume": "number",
"difficulty": "number",
"relevance_score": "number",
"content_type": "string"
}}
],
"summary": {{
"total_keywords": "number",
"high_opportunity_keywords": "number",
"recommended_focus_areas": ["string"]
}}
}}
Focus on:
1. Primary keywords and their variations
2. Long-tail keywords
3. Industry-specific terminology
4. Search volume and difficulty metrics
5. Content type recommendations
"""
try:
response = llm_text_gen(prompt, json_struct={
"type": "object",
"properties": {
"keywords": {
"type": "array",
"items": {
"type": "object",
"properties": {
"keyword": {"type": "string"},
"search_volume": {"type": "number"},
"difficulty": {"type": "number"},
"relevance_score": {"type": "number"},
"content_type": {"type": "string"}
}
}
},
"summary": {
"type": "object",
"properties": {
"total_keywords": {"type": "number"},
"high_opportunity_keywords": {"type": "number"},
"recommended_focus_areas": {
"type": "array",
"items": {"type": "string"}
}
}
}
}
})
return response
except Exception as e:
st.error(f"Error extracting keywords: {e}")
return []
def _analyze_search_intent(self, ai_insights: dict) -> dict:
"""
Analyze search intent from AI insights.
Args:
ai_insights (dict): AI-processed insights
Returns:
dict: Search intent analysis
"""
prompt = f"""
As an SEO expert, analyze the following content insights and determine the search intent:
Content Insights: {ai_insights}
Please provide a JSON response with the following structure:
{{
"informational": [
{{
"keyword": "string",
"intent_type": "string",
"content_suggestions": ["string"]
}}
],
"transactional": [
{{
"keyword": "string",
"intent_type": "string",
"content_suggestions": ["string"]
}}
],
"navigational": [
{{
"keyword": "string",
"intent_type": "string",
"content_suggestions": ["string"]
}}
],
"summary": {{
"dominant_intent": "string",
"content_strategy_recommendations": ["string"]
}}
}}
Focus on:
1. Identifying primary search intent for each keyword
2. Suggesting appropriate content types
3. Providing content strategy recommendations
4. Analyzing user behavior patterns
"""
try:
response = llm_text_gen(prompt, json_struct={
"type": "object",
"properties": {
"informational": {
"type": "array",
"items": {
"type": "object",
"properties": {
"keyword": {"type": "string"},
"intent_type": {"type": "string"},
"content_suggestions": {
"type": "array",
"items": {"type": "string"}
}
}
}
},
"transactional": {
"type": "array",
"items": {
"type": "object",
"properties": {
"keyword": {"type": "string"},
"intent_type": {"type": "string"},
"content_suggestions": {
"type": "array",
"items": {"type": "string"}
}
}
}
},
"navigational": {
"type": "array",
"items": {
"type": "object",
"properties": {
"keyword": {"type": "string"},
"intent_type": {"type": "string"},
"content_suggestions": {
"type": "array",
"items": {"type": "string"}
}
}
}
},
"summary": {
"type": "object",
"properties": {
"dominant_intent": {"type": "string"},
"content_strategy_recommendations": {
"type": "array",
"items": {"type": "string"}
}
}
}
}
})
return response
except Exception as e:
st.error(f"Error analyzing search intent: {e}")
return {
'informational': [],
'transactional': [],
'navigational': []
}
def _suggest_content_formats(self, ai_insights: dict) -> list:
"""
Suggest content formats based on AI insights.
Args:
ai_insights (dict): AI-processed insights
Returns:
list: Suggested content formats
"""
prompt = f"""
As a content strategy expert, analyze the following insights and suggest appropriate content formats:
AI Insights: {ai_insights}
Please provide a JSON response with the following structure:
{{
"content_formats": [
{{
"format": "string",
"description": "string",
"use_cases": ["string"],
"recommended_topics": ["string"],
"estimated_impact": "string"
}}
],
"format_strategy": {{
"primary_formats": ["string"],
"secondary_formats": ["string"],
"implementation_priority": ["string"]
}}
}}
Focus on:
1. Identifying the most effective content formats
2. Matching formats to user intent
3. Suggesting specific use cases
4. Providing implementation guidance
"""
try:
response = llm_text_gen(prompt, json_struct={
"type": "object",
"properties": {
"content_formats": {
"type": "array",
"items": {
"type": "object",
"properties": {
"format": {"type": "string"},
"description": {"type": "string"},
"use_cases": {
"type": "array",
"items": {"type": "string"}
},
"recommended_topics": {
"type": "array",
"items": {"type": "string"}
},
"estimated_impact": {"type": "string"}
}
}
},
"format_strategy": {
"type": "object",
"properties": {
"primary_formats": {
"type": "array",
"items": {"type": "string"}
},
"secondary_formats": {
"type": "array",
"items": {"type": "string"}
},
"implementation_priority": {
"type": "array",
"items": {"type": "string"}
}
}
}
}
})
return response
except Exception as e:
st.error(f"Error suggesting content formats: {e}")
return []
def _create_topic_clusters(self, ai_insights: dict) -> dict:
"""
Create topic clusters from AI insights.
Args:
ai_insights (dict): AI-processed insights
Returns:
dict: Topic clusters and relationships
"""
prompt = f"""
As a content organization expert, analyze the following insights and create topic clusters:
AI Insights: {ai_insights}
Please provide a JSON response with the following structure:
{{
"clusters": [
{{
"cluster_name": "string",
"main_topics": ["string"],
"subtopics": ["string"],
"related_keywords": ["string"],
"content_opportunities": ["string"]
}}
],
"relationships": {{
"cluster_connections": [
{{
"source": "string",
"target": "string",
"relationship_type": "string",
"strength": "number"
}}
],
"content_hierarchy": {{
"primary_topics": ["string"],
"secondary_topics": ["string"],
"tertiary_topics": ["string"]
}}
}}
}}
Focus on:
1. Identifying main topic clusters
2. Organizing subtopics and related keywords
3. Mapping relationships between clusters
4. Suggesting content opportunities
"""
try:
response = llm_text_gen(prompt, json_struct={
"type": "object",
"properties": {
"clusters": {
"type": "array",
"items": {
"type": "object",
"properties": {
"cluster_name": {"type": "string"},
"main_topics": {
"type": "array",
"items": {"type": "string"}
},
"subtopics": {
"type": "array",
"items": {"type": "string"}
},
"related_keywords": {
"type": "array",
"items": {"type": "string"}
},
"content_opportunities": {
"type": "array",
"items": {"type": "string"}
}
}
}
},
"relationships": {
"type": "object",
"properties": {
"cluster_connections": {
"type": "array",
"items": {
"type": "object",
"properties": {
"source": {"type": "string"},
"target": {"type": "string"},
"relationship_type": {"type": "string"},
"strength": {"type": "number"}
}
}
},
"content_hierarchy": {
"type": "object",
"properties": {
"primary_topics": {
"type": "array",
"items": {"type": "string"}
},
"secondary_topics": {
"type": "array",
"items": {"type": "string"}
},
"tertiary_topics": {
"type": "array",
"items": {"type": "string"}
}
}
}
}
}
}
})
return response
except Exception as e:
st.error(f"Error creating topic clusters: {e}")
return {
'clusters': [],
'relationships': {}
}

View File

@@ -0,0 +1,361 @@
"""
Main module for content gap analysis.
"""
from typing import Dict, Any, List, Optional
import streamlit as st
from loguru import logger
from lib.utils.website_analyzer.analyzer import WebsiteAnalyzer
from .competitor_analyzer import CompetitorAnalyzer
from .keyword_researcher import KeywordResearcher
from .recommendation_engine import RecommendationEngine
from .utils.ai_processor import AIProcessor, ProgressTracker
from .utils.storage import ContentGapAnalysisStorage
from datetime import datetime
import asyncio
import sys
import os
from lib.gpt_providers.text_generation.main_text_generation import llm_text_gen
from .utils.content_parser import ContentParser
# Configure logger
logger.remove() # Remove default handler
logger.add(
"logs/content_gap_analysis.log",
rotation="50 MB",
retention="10 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
logger.add(
sys.stdout,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>"
)
# Ensure logs directory exists
os.makedirs("logs", exist_ok=True)
class ContentGapAnalysis:
"""Main class for content gap analysis."""
def __init__(self, db_session=None):
"""Initialize the content gap analysis components."""
self.website_analyzer = WebsiteAnalyzer()
self.competitor_analyzer = CompetitorAnalyzer()
self.keyword_researcher = KeywordResearcher()
self.recommendation_engine = RecommendationEngine()
self.ai_processor = AIProcessor()
self.progress = ProgressTracker()
self.storage = ContentGapAnalysisStorage(db_session) if db_session else None
# Define analysis phases
self.phases = {
'website_analysis': {
'name': 'Website Analysis',
'steps': [
'Initializing website analysis',
'Analyzing website content',
'Evaluating SEO elements',
'Generating website insights'
]
},
'competitor_analysis': {
'name': 'Competitor Analysis',
'steps': [
'Initializing competitor analysis',
'Analyzing competitor content',
'Comparing market position',
'Generating competitive insights'
]
},
'keyword_analysis': {
'name': 'Keyword Analysis',
'steps': [
'Initializing keyword research',
'Analyzing keyword trends',
'Evaluating search intent',
'Generating keyword insights'
]
},
'recommendation_generation': {
'name': 'Recommendation Generation',
'steps': [
'Initializing recommendation engine',
'Analyzing content gaps',
'Generating recommendations',
'Creating implementation plan'
]
}
}
logger.info("ContentGapAnalysis initialized")
def analyze(self, url: str, industry: str, competitor_urls: Optional[List[str]] = None, user_id: Optional[int] = None) -> Dict[str, Any]:
"""
Run the complete content gap analysis workflow.
Args:
url: Target website URL
industry: Industry category
competitor_urls: Optional list of competitor URLs
user_id: Optional user ID for storing results
Returns:
Dictionary containing analysis results
"""
try:
results = {}
start_time = datetime.utcnow()
# Phase 1: Website Analysis
self.progress.start_stage('website_analysis')
self.progress.next_step()
website_analysis = self.website_analyzer.analyze(url)
results['website'] = website_analysis
self.progress.next_step()
self.progress.complete_stage()
# Phase 2: Competitor Analysis
if competitor_urls:
self.progress.start_stage('competitor_analysis')
self.progress.next_step()
competitor_analysis = self.competitor_analyzer.analyze(competitor_urls, industry)
results['competitors'] = competitor_analysis
self.progress.next_step()
self.progress.complete_stage()
# Phase 3: Keyword Analysis
self.progress.start_stage('keyword_analysis')
self.progress.next_step()
keyword_analysis = self.keyword_researcher.analyze(industry, url)
results['keywords'] = keyword_analysis
self.progress.next_step()
self.progress.complete_stage()
# Phase 4: Recommendation Generation
self.progress.start_stage('recommendation_generation')
self.progress.next_step()
recommendations = self.recommendation_engine.generate_recommendations(
website_analysis,
competitor_analysis if competitor_urls else None,
keyword_analysis
)
results['recommendations'] = recommendations
self.progress.next_step()
self.progress.complete_stage()
# Calculate analysis duration
end_time = datetime.utcnow()
results['duration'] = (end_time - start_time).total_seconds()
# Store results if user_id is provided and storage is available
if user_id and self.storage:
analysis_id = self.storage.save_analysis(user_id, url, industry, results)
if analysis_id:
results['analysis_id'] = analysis_id
return results
except Exception as e:
if self.progress.current_stage:
self.progress.update_progress(0, f"Error in {self.progress.stages[self.progress.current_stage]['name']}: {str(e)}")
st.error(f"Error in content gap analysis: {str(e)}")
return {
'error': str(e),
'website': {},
'competitors': [],
'keywords': {},
'recommendations': []
}
def get_analysis(self, analysis_id: int) -> Optional[Dict[str, Any]]:
"""
Retrieve stored analysis results.
Args:
analysis_id: Analysis ID
Returns:
Dictionary containing analysis results if found, None otherwise
"""
if not self.storage:
st.error("Storage not initialized")
return None
return self.storage.get_analysis(analysis_id)
def get_user_analyses(self, user_id: int) -> List[Dict[str, Any]]:
"""
Get all analyses for a user.
Args:
user_id: User ID
Returns:
List of analysis summaries
"""
if not self.storage:
st.error("Storage not initialized")
return []
return self.storage.get_user_analyses(user_id)
def update_recommendation_status(self, recommendation_id: int, status: str) -> bool:
"""
Update the status of a recommendation.
Args:
recommendation_id: Recommendation ID
status: New status
Returns:
True if successful, False otherwise
"""
if not self.storage:
st.error("Storage not initialized")
return False
return self.storage.update_recommendation_status(recommendation_id, status)
def delete_analysis(self, analysis_id: int) -> bool:
"""
Delete an analysis and all related data.
Args:
analysis_id: Analysis ID
Returns:
True if successful, False otherwise
"""
if not self.storage:
st.error("Storage not initialized")
return False
return self.storage.delete_analysis(analysis_id)
def get_analysis_summary(self, results: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate a summary of the analysis results.
Args:
results: Dictionary containing analysis results
Returns:
Dictionary containing summary metrics and insights
"""
try:
self.progress.start_stage('summary_generation')
self.progress.next_step()
summary = {
'website_metrics': self._summarize_website_metrics(results.get('website', {})),
'competitor_insights': self._summarize_competitor_insights(results.get('competitors', {})),
'keyword_opportunities': self._summarize_keyword_opportunities(results.get('keywords', {})),
'recommendation_highlights': self._summarize_recommendations(results.get('recommendations', {})),
'ai_insights': results.get('ai_insights', {})
}
self.progress.complete_stage()
return summary
except Exception as e:
if self.progress.current_stage:
self.progress.update_progress(0, f"Error generating summary: {str(e)}")
st.error(f"Error generating analysis summary: {str(e)}")
return {
'error': str(e),
'website_metrics': {},
'competitor_insights': {},
'keyword_opportunities': {},
'recommendation_highlights': {},
'ai_insights': {}
}
def export_results(self, results: Dict[str, Any], format: str = 'json') -> str:
"""
Export analysis results in the specified format.
Args:
results: Dictionary containing analysis results
format: Export format ('json' or 'csv')
Returns:
String containing exported results
"""
try:
self.progress.start_stage('export')
self.progress.next_step()
if format.lower() == 'json':
import json
exported = json.dumps(results, indent=2)
elif format.lower() == 'csv':
import pandas as pd
# Convert results to DataFrame and then to CSV
df = pd.DataFrame(results)
exported = df.to_csv(index=False)
else:
raise ValueError(f"Unsupported export format: {format}")
self.progress.complete_stage()
return exported
except Exception as e:
if self.progress.current_stage:
self.progress.update_progress(0, f"Error exporting results: {str(e)}")
st.error(f"Error exporting results: {str(e)}")
return str(e)
def _summarize_website_metrics(self, website_data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate summary of website metrics."""
try:
return {
'content_score': website_data.get('content_score', 0),
'seo_score': website_data.get('seo_score', 0),
'structure_score': website_data.get('structure_score', 0),
'key_insights': website_data.get('insights', [])[:5] # Top 5 insights
}
except Exception as e:
st.error(f"Error summarizing website metrics: {str(e)}")
return {}
def _summarize_competitor_insights(self, competitor_data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate summary of competitor insights."""
try:
return {
'market_position': competitor_data.get('market_position', {}),
'content_gaps': competitor_data.get('content_gaps', [])[:5], # Top 5 gaps
'competitive_advantages': competitor_data.get('advantages', [])[:5] # Top 5 advantages
}
except Exception as e:
st.error(f"Error summarizing competitor insights: {str(e)}")
return {}
def _summarize_keyword_opportunities(self, keyword_data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate summary of keyword opportunities."""
try:
return {
'top_keywords': keyword_data.get('top_keywords', [])[:10], # Top 10 keywords
'search_intent': keyword_data.get('search_intent', {}),
'opportunities': keyword_data.get('opportunities', [])[:5] # Top 5 opportunities
}
except Exception as e:
st.error(f"Error summarizing keyword opportunities: {str(e)}")
return {}
def _summarize_recommendations(self, recommendation_data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate summary of recommendations."""
try:
return {
'priority_recommendations': recommendation_data.get('priority_recommendations', [])[:5], # Top 5 recommendations
'implementation_timeline': recommendation_data.get('timeline', {}),
'expected_impact': recommendation_data.get('impact', {})
}
except Exception as e:
st.error(f"Error summarizing recommendations: {str(e)}")
return {}

View File

@@ -0,0 +1,41 @@
"""
Navigation component for Content Gap Analysis tool.
"""
import streamlit as st
def show_content_gap_analysis_nav():
"""Show navigation for Content Gap Analysis tool."""
st.sidebar.title("Content Gap Analysis")
st.sidebar.markdown("""
Analyze your content strategy, identify gaps, and get AI-powered recommendations.
""")
# Navigation options
nav_option = st.sidebar.radio(
"Select Analysis Type",
["Website Analysis", "Competitor Analysis", "Keyword Research", "Recommendations"]
)
# Tool description
st.sidebar.markdown("""
### Features
- Website content analysis
- Competitor content comparison
- Keyword research and trends
- AI-powered recommendations
- Content gap identification
- Implementation timeline
""")
# Help section
with st.sidebar.expander("How to Use"):
st.markdown("""
1. Start with Website Analysis
2. Add competitor URLs
3. Research keywords
4. Get recommendations
5. Export results
""")
return nav_option

View File

@@ -0,0 +1,440 @@
"""
Recommendation engine for content gap analysis.
"""
import streamlit as st
from typing import Dict, Any, List, Optional
from loguru import logger
from lib.utils.website_analyzer.analyzer import WebsiteAnalyzer
from lib.ai_seo_tools.content_gap_analysis.utils.data_collector import DataCollector
from lib.ai_seo_tools.content_gap_analysis.utils.content_parser import ContentParser
from lib.ai_seo_tools.content_gap_analysis.utils.ai_processor import AIProcessor, ProgressTracker
from lib.ai_seo_tools.content_title_generator import ai_title_generator
import asyncio
import sys
import os
import json
from lib.gpt_providers.text_generation.main_text_generation import llm_text_gen
# Configure logger
logger.remove() # Remove default handler
logger.add(
"logs/recommendation_engine.log",
rotation="50 MB",
retention="10 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
logger.add(
sys.stdout,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>"
)
# Ensure logs directory exists
os.makedirs("logs", exist_ok=True)
class RecommendationEngine:
"""
Generates content recommendations based on analysis results.
"""
def __init__(self):
"""Initialize the recommendation engine with required components."""
self.ai_processor = AIProcessor()
self.progress = ProgressTracker()
# Define analysis stages
self.stages = {
'recommendation_generation': {
'name': 'Recommendation Generation',
'steps': [
'Initializing recommendation engine',
'Analyzing content gaps',
'Evaluating opportunities',
'Generating recommendations',
'Creating implementation plan'
]
}
}
def generate_recommendations(self, website_analysis: Dict[str, Any], competitor_analysis: Optional[Dict[str, Any]], keyword_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate content recommendations.
Args:
website_analysis: Website analysis results
competitor_analysis: Optional competitor analysis results
keyword_analysis: Keyword analysis results
Returns:
Dictionary containing recommendations
"""
try:
self.progress.start_stage('recommendation_generation')
self.progress.next_step()
# Analyze content gaps
content_gaps = self._analyze_content_gaps(website_analysis, competitor_analysis, keyword_analysis)
self.progress.next_step()
# Evaluate opportunities
opportunities = self._evaluate_opportunities(content_gaps, keyword_analysis)
self.progress.next_step()
# Generate recommendations
recommendations = self._generate_recommendations(content_gaps, opportunities)
self.progress.next_step()
# Create implementation plan
implementation_plan = self._create_implementation_plan(recommendations)
self.progress.next_step()
self.progress.complete_stage()
return {
'content_gaps': content_gaps,
'opportunities': opportunities,
'recommendations': recommendations,
'implementation_plan': implementation_plan
}
except Exception as e:
if self.progress.current_stage:
self.progress.update_progress(0, f"Error in {self.progress.stages[self.progress.current_stage]['name']}: {str(e)}")
st.error(f"Error generating recommendations: {str(e)}")
return {
'error': str(e),
'content_gaps': [],
'opportunities': [],
'recommendations': [],
'implementation_plan': {}
}
def _analyze_content_gaps(self, website_analysis: Dict[str, Any], competitor_analysis: Optional[Dict[str, Any]], keyword_analysis: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Analyze content gaps."""
try:
content_gaps = []
# Analyze website content gaps
website_gaps = self._analyze_website_gaps(website_analysis)
content_gaps.extend(website_gaps)
# Analyze competitor gaps if available
if competitor_analysis:
competitor_gaps = self._analyze_competitor_gaps(competitor_analysis)
content_gaps.extend(competitor_gaps)
# Analyze keyword gaps
keyword_gaps = self._analyze_keyword_gaps(keyword_analysis)
content_gaps.extend(keyword_gaps)
return content_gaps
except Exception as e:
st.error(f"Error analyzing content gaps: {str(e)}")
return []
def _analyze_website_gaps(self, website_analysis: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Analyze website content gaps."""
try:
gaps = []
# Check content quality
quality_metrics = website_analysis.get('quality_metrics', {})
if quality_metrics.get('readability_score', 0) < 70:
gaps.append({
'type': 'content_quality',
'issue': 'Low readability score',
'score': quality_metrics.get('readability_score', 0),
'recommendation': 'Improve content readability'
})
# Check SEO elements
seo_metrics = website_analysis.get('seo_metrics', {})
if seo_metrics.get('seo_score', 0) < 70:
gaps.append({
'type': 'seo',
'issue': 'Low SEO score',
'score': seo_metrics.get('seo_score', 0),
'recommendation': 'Enhance SEO optimization'
})
return gaps
except Exception as e:
st.error(f"Error analyzing website gaps: {str(e)}")
return []
def _analyze_competitor_gaps(self, competitor_analysis: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Analyze competitor content gaps."""
try:
gaps = []
# Check content gaps
content_gaps = competitor_analysis.get('content_gaps', [])
for gap in content_gaps:
gaps.append({
'type': 'competitor',
'issue': f"Missing topic: {', '.join(gap.get('missing_topics', []))}",
'recommendation': 'Create content for missing topics'
})
return gaps
except Exception as e:
st.error(f"Error analyzing competitor gaps: {str(e)}")
return []
def _analyze_keyword_gaps(self, keyword_analysis: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Analyze keyword gaps."""
try:
gaps = []
# Check keyword opportunities
opportunities = keyword_analysis.get('opportunities', [])
for opportunity in opportunities:
gaps.append({
'type': 'keyword',
'issue': f"Keyword opportunity: {opportunity.get('keyword')}",
'volume': opportunity.get('volume', 0),
'difficulty': opportunity.get('difficulty', 0),
'recommendation': f"Target keyword: {opportunity.get('keyword')}"
})
return gaps
except Exception as e:
st.error(f"Error analyzing keyword gaps: {str(e)}")
return []
def _evaluate_opportunities(self, content_gaps: List[Dict[str, Any]], keyword_analysis: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Evaluate content opportunities."""
try:
opportunities = []
# Evaluate each gap
for gap in content_gaps:
# Calculate priority score
priority_score = self._calculate_priority_score(gap, keyword_analysis)
if priority_score > 50: # Threshold for good opportunities
opportunities.append({
'type': gap.get('type'),
'issue': gap.get('issue'),
'recommendation': gap.get('recommendation'),
'priority_score': priority_score
})
# Sort by priority score
opportunities.sort(key=lambda x: x['priority_score'], reverse=True)
return opportunities
except Exception as e:
st.error(f"Error evaluating opportunities: {str(e)}")
return []
def _calculate_priority_score(self, gap: Dict[str, Any], keyword_analysis: Dict[str, Any]) -> float:
"""Calculate priority score for a gap."""
try:
base_score = 0
# Base score based on gap type
if gap.get('type') == 'content_quality':
base_score = 70
elif gap.get('type') == 'seo':
base_score = 80
elif gap.get('type') == 'competitor':
base_score = 60
elif gap.get('type') == 'keyword':
base_score = 50
# Adjust score based on keyword data
if gap.get('type') == 'keyword':
keyword = gap.get('issue', '').split(': ')[-1]
keyword_data = keyword_analysis.get('trend_analysis', {}).get('trends', {}).get(keyword, {})
if keyword_data:
base_score += keyword_data.get('volume', 0) * 0.1
base_score -= keyword_data.get('difficulty', 0) * 0.2
return min(100, max(0, base_score))
except Exception as e:
st.error(f"Error calculating priority score: {str(e)}")
return 0
def _generate_recommendations(self, content_gaps: List[Dict[str, Any]], opportunities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Generate content recommendations."""
try:
recommendations = []
# Generate recommendations for each opportunity
for opportunity in opportunities:
recommendations.append({
'type': opportunity.get('type'),
'issue': opportunity.get('issue'),
'recommendation': opportunity.get('recommendation'),
'priority': opportunity.get('priority_score', 0),
'implementation_steps': self._generate_implementation_steps(opportunity)
})
return recommendations
except Exception as e:
st.error(f"Error generating recommendations: {str(e)}")
return []
def _generate_implementation_steps(self, opportunity: Dict[str, Any]) -> List[str]:
"""Generate implementation steps for a recommendation."""
try:
steps = []
if opportunity.get('type') == 'content_quality':
steps = [
'Review current content structure',
'Improve readability and formatting',
'Enhance content organization',
'Update content based on best practices'
]
elif opportunity.get('type') == 'seo':
steps = [
'Audit current SEO implementation',
'Optimize meta tags and descriptions',
'Improve content structure for SEO',
'Implement technical SEO improvements'
]
elif opportunity.get('type') == 'competitor':
steps = [
'Research competitor content',
'Identify unique value proposition',
'Create content for missing topics',
'Optimize content for target keywords'
]
elif opportunity.get('type') == 'keyword':
steps = [
'Research keyword intent',
'Create content strategy',
'Develop content for target keyword',
'Optimize content for search'
]
return steps
except Exception as e:
st.error(f"Error generating implementation steps: {str(e)}")
return []
def _create_implementation_plan(self, recommendations: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Create implementation plan."""
try:
plan = {
'phases': [],
'timeline': {},
'resources': {},
'success_metrics': {}
}
# Create phases based on recommendation types
phases = {
'content_quality': 'Content Enhancement',
'seo': 'SEO Optimization',
'competitor': 'Competitive Content',
'keyword': 'Keyword Targeting'
}
# Group recommendations by phase
for phase_name in phases.values():
phase_recommendations = [
rec for rec in recommendations
if phases.get(rec.get('type')) == phase_name
]
if phase_recommendations:
plan['phases'].append({
'name': phase_name,
'recommendations': phase_recommendations,
'duration': '2-4 weeks',
'resources': ['Content team', 'SEO team'],
'success_metrics': [
'Content quality score',
'SEO performance',
'User engagement'
]
})
return plan
except Exception as e:
st.error(f"Error creating implementation plan: {str(e)}")
return {}
def _generate_content_topics(self, ai_insights: dict) -> list:
"""
Generate content topic suggestions.
Args:
ai_insights (dict): AI-processed insights
Returns:
list: Content topic suggestions
"""
# TODO: Implement content topic generation
return []
def _suggest_content_formats(self, ai_insights: dict) -> list:
"""
Suggest content formats based on analysis.
Args:
ai_insights (dict): AI-processed insights
Returns:
list: Content format suggestions
"""
# TODO: Implement content format suggestions
return []
def _calculate_priority_scores(self, ai_insights: dict) -> dict:
"""
Calculate priority scores for recommendations.
Args:
ai_insights (dict): AI-processed insights
Returns:
dict: Priority scores for each recommendation
"""
# TODO: Implement priority scoring
return {}
def _create_timeline(self, ai_insights: dict) -> dict:
"""
Create implementation timeline for recommendations.
Args:
ai_insights (dict): AI-processed insights
Returns:
dict: Implementation timeline
"""
# TODO: Implement timeline creation
return {
'short_term': [],
'medium_term': [],
'long_term': []
}
def _generate_specific_suggestions(self, recommendations: dict, analysis_results: dict) -> dict:
"""
Generate specific content suggestions using existing tools.
Args:
recommendations (dict): General recommendations
analysis_results (dict): Analysis results
Returns:
dict: Specific content suggestions
"""
suggestions = {}
# Generate titles for suggested topics
for topic in recommendations['content_topics']:
suggestions[topic] = {
'titles': ai_title_generator(topic),
'meta_descriptions': metadesc_generator_main(topic),
'structured_data': ai_structured_data(topic)
}
return suggestions

View File

@@ -0,0 +1,769 @@
"""
Streamlit UI for Content Gap Analysis workflow.
"""
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import json
from datetime import datetime
from .main import ContentGapAnalysis
from .keyword_researcher import KeywordResearcher
from .competitor_analyzer import CompetitorAnalyzer
from .website_analyzer import WebsiteAnalyzer
from .recommendation_engine import RecommendationEngine
from .utils.ai_processor import AIProcessor
from .navigation import show_content_gap_analysis_nav
from typing import Dict, Any
import logging
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class ContentGapAnalysisUI:
"""Streamlit UI for Content Gap Analysis workflow."""
def __init__(self):
"""Initialize the UI components."""
# Initialize session state for progress tracking
if 'current_step' not in st.session_state:
st.session_state.current_step = 1
if 'analysis_results' not in st.session_state:
st.session_state.analysis_results = {}
# Initialize analysis components
self.analyzer = ContentGapAnalysis()
self.keyword_researcher = KeywordResearcher()
self.competitor_analyzer = CompetitorAnalyzer()
self.website_analyzer = WebsiteAnalyzer()
self.recommendation_engine = RecommendationEngine()
self.ai_processor = AIProcessor()
def run(self):
"""Run the Streamlit interface."""
try:
# Show navigation
nav_option = show_content_gap_analysis_nav()
# Main content area
st.title("Content Gap Analysis")
st.markdown("""
This tool helps you identify content gaps and opportunities by analyzing your website,
competitors, and market trends. Follow the steps below to get started.
""")
# Progress tracking
self._show_progress()
# Main workflow steps
if nav_option == "Website Analysis" or st.session_state.current_step == 1:
self._website_analysis_step()
elif nav_option == "Competitor Analysis" or st.session_state.current_step == 2:
self._competitor_analysis_step()
elif nav_option == "Keyword Research" or st.session_state.current_step == 3:
self._keyword_research_step()
elif nav_option == "Recommendations" or st.session_state.current_step == 4:
self._recommendations_step()
else:
self._export_results()
except Exception as e:
logger.error(f"Error in run method: {str(e)}", exc_info=True)
st.error(f"An error occurred: {str(e)}")
def _show_progress(self):
"""Display progress tracking."""
steps = [
"Website Analysis",
"Competitor Analysis",
"Keyword Research",
"Recommendations",
"Export Results"
]
progress = st.session_state.current_step / len(steps)
st.progress(progress)
cols = st.columns(len(steps))
for i, col in enumerate(cols):
with col:
if i + 1 < st.session_state.current_step:
st.success(f"{steps[i]}")
elif i + 1 == st.session_state.current_step:
st.info(f"{steps[i]}")
else:
st.text(f"{steps[i]}")
def _website_analysis_step(self):
"""Website analysis step UI."""
try:
st.header("Step 1: Website Analysis")
# Display previous results if they exist
if 'website' in st.session_state.analysis_results:
st.info("Previous analysis results found. You can analyze a new website or proceed to the next step.")
self._display_website_analysis(st.session_state.analysis_results['website'])
col1, col2 = st.columns(2)
with col1:
if st.button("Analyze New Website"):
st.session_state.analysis_results.pop('website', None)
st.rerun()
with col2:
if st.button("Proceed to Competitor Analysis"):
st.session_state.current_step = 2
st.rerun()
return
# Create form for new analysis
with st.form("website_analysis_form"):
website_url = st.text_input("Enter your website URL")
industry = st.text_input("Enter your industry/niche")
submitted = st.form_submit_button("Analyze Website")
# Handle form submission outside the form
if submitted and website_url and industry:
# Initialize progress tracking
if 'analysis_progress' not in st.session_state:
st.session_state.analysis_progress = {
'status': 'initializing',
'current_step': 'Starting Analysis',
'progress': 0,
'details': 'Initializing analysis...'
}
# Create progress container
progress_container = st.empty()
status_container = st.empty()
details_container = st.empty()
# Update progress display
def update_progress_display():
progress = st.session_state.analysis_progress
# Update progress bar
with progress_container:
st.progress(progress['progress'] / 100)
# Update status
with status_container:
if progress['status'] == 'error':
st.error(f"Error: {progress['current_step']}")
elif progress['status'] == 'completed':
st.success(f"{progress['current_step']}")
else:
st.info(f"{progress['current_step']}")
# Update details
with details_container:
st.write(progress['details'])
# Initial progress display
update_progress_display()
try:
# Get basic analysis
results = self.website_analyzer.analyze(website_url)
# Update progress from analyzer
st.session_state.analysis_progress = self.website_analyzer.progress.get_progress()
update_progress_display()
if isinstance(results, dict) and 'error' in results:
st.error(f"Error in website analysis: {results['error']}")
return
# Get AI-enhanced analysis
st.session_state.analysis_progress.update({
'current_step': 'AI Analysis',
'progress': 95,
'details': 'Performing AI-enhanced analysis...'
})
update_progress_display()
ai_analysis = self.ai_processor.analyze_content({
'url': website_url,
'industry': industry,
'content': results
})
# Combine results
if isinstance(results, dict):
results.update(ai_analysis)
else:
results = {'error': 'Invalid analysis results format'}
# Store results in session state
st.session_state.analysis_results['website'] = results
# Update final progress
st.session_state.analysis_progress.update({
'status': 'completed',
'current_step': 'Analysis Complete',
'progress': 100,
'details': 'Analysis completed successfully!'
})
update_progress_display()
# Display results
self._display_website_analysis(results)
except Exception as e:
logger.error(f"Error during website analysis: {str(e)}", exc_info=True)
st.session_state.analysis_progress.update({
'status': 'error',
'current_step': 'Analysis Failed',
'details': f"Error during website analysis: {str(e)}"
})
update_progress_display()
st.error(f"Error during website analysis: {str(e)}")
return
except Exception as e:
logger.error(f"Error in website analysis step: {str(e)}", exc_info=True)
st.error(f"Error in website analysis: {str(e)}")
def _display_website_analysis(self, results: Dict[str, Any]):
"""Display website analysis results."""
try:
if not isinstance(results, dict):
st.error("Invalid analysis results format")
return
if 'error' in results:
st.error(f"Error in analysis: {results['error']}")
return
# Content Metrics
st.subheader("Content Metrics")
content_metrics = results.get('content_metrics', {})
if content_metrics:
# Basic metrics in columns
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Word Count", f"{content_metrics.get('word_count', 0):,}")
with col2:
st.metric("Headings", f"{content_metrics.get('heading_count', 0):,}")
with col3:
st.metric("Images", f"{content_metrics.get('image_count', 0):,}")
with col4:
st.metric("Links", f"{content_metrics.get('link_count', 0):,}")
# Content Structure Visualization
st.write("Content Structure")
heading_data = {
'Type': ['H1', 'H2', 'H3', 'Paragraphs'],
'Count': [
content_metrics.get('h1_count', 0),
content_metrics.get('h2_count', 0),
content_metrics.get('h3_count', 0),
content_metrics.get('paragraph_count', 0)
]
}
fig = px.bar(
heading_data,
x='Type',
y='Count',
title="Content Structure Distribution",
color='Type',
color_discrete_sequence=px.colors.qualitative.Set3
)
st.plotly_chart(fig, use_container_width=True)
# Content Features
st.write("Content Features")
features = {
'Feature': ['Meta Description', 'Robots.txt', 'Sitemap'],
'Status': [
content_metrics.get('has_meta_description', False),
content_metrics.get('has_robots_txt', False),
content_metrics.get('has_sitemap', False)
]
}
fig = px.bar(
features,
x='Feature',
y='Status',
title="Content Features Status",
color='Status',
color_discrete_sequence=['red', 'green']
)
st.plotly_chart(fig, use_container_width=True)
# SEO Metrics
st.subheader("SEO Metrics")
seo_metrics = results.get('seo_metrics', {})
if seo_metrics:
# Basic metrics in columns
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Overall Score", f"{seo_metrics.get('overall_score', 0):.1f}%")
with col2:
content_quality = seo_metrics.get('content', {}).get('content_quality_score', 0)
st.metric("Content Quality", f"{content_quality:.1f}%")
with col3:
readability = seo_metrics.get('content', {}).get('readability_score', 0)
st.metric("Readability", f"{readability:.1f}%")
with col4:
keyword_density = seo_metrics.get('content', {}).get('keyword_density', 0)
st.metric("Keyword Density", f"{keyword_density:.1f}%")
# SEO Scores Radar Chart
seo_scores = {
'Metric': ['Overall', 'Content Quality', 'Readability', 'Keyword Density'],
'Score': [
seo_metrics.get('overall_score', 0),
content_quality,
readability,
keyword_density
]
}
fig = px.line_polar(
seo_scores,
r='Score',
theta='Metric',
line_close=True,
title="SEO Performance Overview"
)
fig.update_traces(fill='toself')
st.plotly_chart(fig, use_container_width=True)
# Meta Tags Analysis
st.write("Meta Tags Analysis")
meta_tags = seo_metrics.get('meta_tags', {})
if meta_tags:
# Title Analysis
title = meta_tags.get('title', {})
st.write("Title Tag")
st.write(f"Status: {'' if title.get('status') == 'good' else ''}")
st.write(f"Value: {title.get('value', 'N/A')}")
st.write(f"Length: {title.get('length', 0)} characters")
st.write(f"Score: {title.get('score', 0)}%")
if title.get('recommendation'):
st.warning(title.get('recommendation'))
# Description Analysis
desc = meta_tags.get('description', {})
st.write("Meta Description")
st.write(f"Status: {'' if desc.get('status') == 'good' else ''}")
st.write(f"Value: {desc.get('value', 'N/A')}")
st.write(f"Length: {desc.get('length', 0)} characters")
st.write(f"Score: {desc.get('score', 0)}%")
if desc.get('recommendation'):
st.warning(desc.get('recommendation'))
# Keywords Analysis
keywords = meta_tags.get('keywords', {})
st.write("Meta Keywords")
st.write(f"Status: {'' if keywords.get('status') == 'good' else ''}")
st.write(f"Value: {keywords.get('value', 'N/A')}")
if keywords.get('recommendation'):
st.warning(keywords.get('recommendation'))
# Technical Metrics
st.subheader("Technical Metrics")
technical_info = results.get('technical_info', {})
if technical_info:
col1, col2 = st.columns(2)
with col1:
st.write("Basic Information")
st.metric("Status Code", technical_info.get('status_code', 'N/A'))
st.metric("Server", technical_info.get('server_info', {}).get('server', 'N/A'))
st.metric("Content Type", technical_info.get('server_info', {}).get('content_type', 'N/A'))
with col2:
st.write("Security Information")
security_info = technical_info.get('security_info', {})
security_data = {
'Feature': ['SSL', 'HSTS', 'XSS Protection'],
'Status': [
security_info.get('ssl', False),
security_info.get('hsts', False),
security_info.get('xss_protection', False)
]
}
fig = px.bar(
security_data,
x='Feature',
y='Status',
title="Security Features Status",
color='Status',
color_discrete_sequence=['red', 'green']
)
st.plotly_chart(fig, use_container_width=True)
# Performance Metrics
st.subheader("Performance Metrics")
performance = results.get('performance', {})
if performance:
# Basic metrics in columns
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Load Time", f"{performance.get('load_time', 0):.2f}s")
with col2:
st.metric("Page Size", f"{performance.get('page_size', 0):.1f} KB")
with col3:
st.metric("Status Code", performance.get('status_code', 'N/A'))
with col4:
st.metric("Response Time", f"{performance.get('response_time', 0):.2f}s")
# Insights and Recommendations
st.subheader("Insights and Recommendations")
insights = results.get('insights', [])
if insights:
for insight in insights:
st.info(f"{insight}")
else:
st.info("No specific insights available")
except Exception as e:
logger.error(f"Error displaying website analysis: {str(e)}", exc_info=True)
st.error(f"Error displaying website analysis: {str(e)}")
def _competitor_analysis_step(self):
"""Competitor analysis step UI."""
try:
st.header("Step 2: Competitor Analysis")
with st.form("competitor_analysis_form"):
competitors = st.text_area(
"Enter competitor URLs (one per line)",
help="Enter the URLs of your main competitors"
)
submitted = st.form_submit_button("Analyze Competitors")
if submitted and competitors:
with st.spinner("Analyzing competitors..."):
competitor_urls = [url.strip() for url in competitors.split('\n') if url.strip()]
results = self.competitor_analyzer.analyze(competitor_urls)
# Get AI-enhanced competitor analysis
ai_analysis = self.ai_processor.analyze_competitors({
'competitors': competitor_urls,
'analysis': results
})
# Combine results
results.update(ai_analysis)
st.session_state.analysis_results['competitors'] = results
# Display results
self._display_competitor_analysis(results)
# Move to next step
st.session_state.current_step = 3
st.rerun()
except Exception as e:
logger.error(f"Error in competitor analysis step: {str(e)}", exc_info=True)
st.error(f"Error in competitor analysis: {str(e)}")
def _display_competitor_analysis(self, results: dict):
"""Display competitor analysis results."""
st.subheader("Competitor Analysis Results")
# Competitor comparison
st.subheader("Competitor Comparison")
comp_data = pd.DataFrame(results.get('comparison', []))
if not comp_data.empty:
fig = px.bar(
comp_data,
x='competitor',
y='score',
color='metric',
title="Competitor Comparison"
)
st.plotly_chart(fig)
# AI-Enhanced Competitor Analysis
st.subheader("AI-Enhanced Competitor Analysis")
# Competitor Trend Analysis
trend_data = results.get('competitor_trends', {})
if trend_data:
fig = go.Figure()
for competitor, trends in trend_data.items():
fig.add_trace(go.Scatter(
x=trends.get('timeline', []),
y=trends.get('scores', []),
name=competitor,
mode='lines+markers'
))
fig.update_layout(
title="Competitor Performance Trends",
xaxis_title="Timeline",
yaxis_title="Score"
)
st.plotly_chart(fig)
# Content gaps
st.subheader("Content Gaps")
gaps = results.get('content_gaps', [])
for gap in gaps:
st.info(f"{gap}")
# AI-Generated Competitive Insights
st.subheader("Competitive Insights")
insights = results.get('competitive_insights', {})
if insights:
for category, points in insights.items():
with st.expander(f"{category.title()} Analysis"):
for point in points:
st.success(f"{point}")
def _keyword_research_step(self):
"""Keyword research step UI."""
try:
st.header("Step 3: Keyword Research")
with st.form("keyword_research_form"):
industry = st.text_input(
"Enter your industry/niche",
value=st.session_state.analysis_results.get('website', {}).get('industry', '')
)
submitted = st.form_submit_button("Research Keywords")
if submitted and industry:
with st.spinner("Researching keywords..."):
results = self.keyword_researcher.research(industry)
# Get AI-enhanced keyword analysis
ai_analysis = self.ai_processor.analyze_keywords({
'industry': industry,
'keywords': results
})
# Combine results
results.update(ai_analysis)
st.session_state.analysis_results['keywords'] = results
# Display results
self._display_keyword_research(results)
# Move to next step
st.session_state.current_step = 4
st.rerun()
except Exception as e:
logger.error(f"Error in keyword research step: {str(e)}", exc_info=True)
st.error(f"Error in keyword research: {str(e)}")
def _display_keyword_research(self, results: dict):
"""Display keyword research results."""
st.subheader("Keyword Research Results")
# Keyword metrics
st.subheader("Keyword Metrics")
keyword_data = pd.DataFrame(results.get('keywords', []))
if not keyword_data.empty:
fig = px.scatter(
keyword_data,
x='search_volume',
y='difficulty',
size='relevance_score',
hover_data=['keyword'],
title="Keyword Opportunities"
)
st.plotly_chart(fig)
# AI-Enhanced Keyword Analysis
st.subheader("AI-Enhanced Keyword Analysis")
# Keyword Trend Analysis
trend_data = results.get('keyword_trends', {})
if trend_data:
fig = go.Figure()
for keyword, trends in trend_data.items():
fig.add_trace(go.Scatter(
x=trends.get('timeline', []),
y=trends.get('scores', []),
name=keyword,
mode='lines+markers'
))
fig.update_layout(
title="Keyword Trend Analysis",
xaxis_title="Timeline",
yaxis_title="Trend Score"
)
st.plotly_chart(fig)
# Search intent distribution
st.subheader("Search Intent Distribution")
intent_data = pd.DataFrame(results.get('search_intent', {}).get('summary', {}))
if not intent_data.empty:
fig = px.pie(
intent_data,
values='count',
names='intent',
title="Search Intent Distribution"
)
st.plotly_chart(fig)
# Content format suggestions
st.subheader("Content Format Suggestions")
formats = results.get('content_formats', [])
for format in formats:
st.info(f"{format}")
# AI-Generated Keyword Insights
st.subheader("Keyword Insights")
insights = results.get('keyword_insights', {})
if insights:
for category, points in insights.items():
with st.expander(f"{category.title()} Insights"):
for point in points:
st.success(f"{point}")
def _recommendations_step(self):
"""Recommendations step UI."""
try:
st.header("Step 4: Content Recommendations")
with st.spinner("Generating recommendations..."):
results = self.recommendation_engine.generate_recommendations(
st.session_state.analysis_results
)
# Get AI-enhanced recommendations
ai_recommendations = self.ai_processor.analyze_recommendations({
'recommendations': results,
'analysis': st.session_state.analysis_results
})
# Combine results
results.update(ai_recommendations)
st.session_state.analysis_results['recommendations'] = results
# Display results
self._display_recommendations(results)
# Move to next step
st.session_state.current_step = 5
st.rerun()
except Exception as e:
logger.error(f"Error in recommendations step: {str(e)}", exc_info=True)
st.error(f"Error in recommendations: {str(e)}")
def _display_recommendations(self, results: dict):
"""Display content recommendations."""
st.subheader("Content Recommendations")
# Priority recommendations
st.subheader("Priority Recommendations")
priorities = results.get('priorities', [])
for priority in priorities:
st.success(f"{priority}")
# AI-Enhanced Recommendations
st.subheader("AI-Enhanced Recommendations")
# Recommendation Impact Analysis
impact_data = results.get('impact_analysis', {})
if impact_data:
fig = go.Figure()
for metric, values in impact_data.items():
fig.add_trace(go.Bar(
name=metric,
x=values.get('categories', []),
y=values.get('scores', [])
))
fig.update_layout(
title="Recommendation Impact Analysis",
xaxis_title="Categories",
yaxis_title="Impact Score",
barmode='group'
)
st.plotly_chart(fig)
# Implementation timeline
st.subheader("Implementation Timeline")
timeline = results.get('timeline', [])
for item in timeline:
st.info(f"{item}")
# Expected impact
st.subheader("Expected Impact")
impact = results.get('impact', {})
for metric, value in impact.items():
st.metric(metric, value)
# AI-Generated Strategic Insights
st.subheader("Strategic Insights")
insights = results.get('strategic_insights', {})
if insights:
for category, points in insights.items():
with st.expander(f"{category.title()} Strategy"):
for point in points:
st.success(f"{point}")
def _export_results(self):
"""Export results step UI."""
st.header("Step 5: Export Results")
# Export options
export_format = st.radio(
"Choose export format",
["JSON", "CSV", "PDF"]
)
if st.button("Export Results"):
if export_format == "JSON":
self._export_json()
elif export_format == "CSV":
self._export_csv()
else:
st.info("PDF export coming soon!")
def _export_json(self):
"""Export results as JSON."""
results = st.session_state.analysis_results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"content_gap_analysis_{timestamp}.json"
st.download_button(
"Download JSON",
data=json.dumps(results, indent=2),
file_name=filename,
mime="application/json"
)
def _export_csv(self):
"""Export results as CSV."""
results = st.session_state.analysis_results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Convert results to CSV format
csv_data = []
for section, data in results.items():
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
item['section'] = section
csv_data.append(item)
elif isinstance(data, dict):
data['section'] = section
csv_data.append(data)
if csv_data:
df = pd.DataFrame(csv_data)
filename = f"content_gap_analysis_{timestamp}.csv"
st.download_button(
"Download CSV",
data=df.to_csv(index=False),
file_name=filename,
mime="text/csv"
)
def main():
"""Main entry point for the Streamlit app."""
ui = ContentGapAnalysisUI()
ui.run()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,249 @@
# Content Gap Analysis Utils
This directory contains utility modules that power the Content Gap Analysis tool. These modules provide core functionality for data collection, processing, analysis, and storage.
## Directory Structure
```
utils/
├── README.md
├── ai_processor.py # AI-powered content analysis and processing
├── content_parser.py # Content structure parsing and analysis
├── data_collector.py # Website data collection and processing
└── storage.py # Analysis results storage and retrieval
```
## Module Descriptions
### 1. AI Processor (`ai_processor.py`)
The AI Processor module enhances content analysis using AI techniques. It provides intelligent analysis of website content, competitor data, and keyword research.
#### Key Features:
- Content quality assessment
- Topic analysis and clustering
- Performance metrics analysis
- Strategic recommendations generation
- Progress tracking for analysis tasks
#### Main Components:
- `AIProcessor`: Main class for AI-powered analysis
- `ProgressTracker`: Tracks analysis progress and status
#### Usage Example:
```python
from utils.ai_processor import AIProcessor
processor = AIProcessor()
analysis = processor.analyze_content({
'url': 'https://example.com',
'industry': 'technology',
'content': content_data
})
```
### 2. Content Parser (`content_parser.py`)
The Content Parser module handles the parsing and analysis of website content structure. It provides detailed insights into content organization and quality.
#### Key Features:
- Content structure analysis
- Text statistics calculation
- Topic extraction
- Readability analysis
- Content hierarchy analysis
#### Main Components:
- `ContentParser`: Main class for content parsing and analysis
#### Usage Example:
```python
from utils.content_parser import ContentParser
parser = ContentParser()
structure = parser.parse_structure({
'main_content': content,
'html': html_content,
'headings': headings_data
})
```
### 3. Data Collector (`data_collector.py`)
The Data Collector module is responsible for gathering website data for analysis. It handles web scraping and data extraction.
#### Key Features:
- Website content collection
- Meta data extraction
- Heading structure analysis
- Link and image extraction
- Error handling and retry logic
#### Main Components:
- `DataCollector`: Main class for data collection
#### Usage Example:
```python
from utils.data_collector import DataCollector
collector = DataCollector()
data = collector.collect('https://example.com')
```
### 4. Storage (`storage.py`)
The Storage module manages the persistence and retrieval of analysis results. It provides a robust database interface for storing and accessing analysis data.
#### Key Features:
- Analysis results storage
- Historical data management
- Recommendation tracking
- User-specific analysis storage
- Error handling and rollback support
#### Main Components:
- `ContentGapAnalysisStorage`: Main class for storage operations
#### Usage Example:
```python
from utils.storage import ContentGapAnalysisStorage
storage = ContentGapAnalysisStorage(db_session)
analysis_id = storage.save_analysis(
user_id=1,
website_url='https://example.com',
industry='technology',
results=analysis_results
)
```
## Integration Points
### 1. Website Analysis Integration
```python
from utils.data_collector import DataCollector
from utils.content_parser import ContentParser
from utils.ai_processor import AIProcessor
# Collect data
collector = DataCollector()
data = collector.collect(url)
# Parse content
parser = ContentParser()
structure = parser.parse_structure(data)
# Process with AI
processor = AIProcessor()
analysis = processor.analyze_content({
'url': url,
'content': structure
})
```
### 2. Storage Integration
```python
from utils.storage import ContentGapAnalysisStorage
# Store analysis results
storage = ContentGapAnalysisStorage(db_session)
analysis_id = storage.save_analysis(
user_id=user_id,
website_url=url,
industry=industry,
results=analysis_results
)
# Retrieve analysis
results = storage.get_analysis(analysis_id)
```
## Error Handling
All modules implement comprehensive error handling:
1. **Data Collection Errors**
- Network timeouts
- Invalid URLs
- Access restrictions
- Parsing errors
2. **Processing Errors**
- Invalid data formats
- AI processing failures
- Resource limitations
- Analysis timeouts
3. **Storage Errors**
- Database connection issues
- Transaction failures
- Data validation errors
- Concurrent access conflicts
## Best Practices
1. **Data Collection**
- Implement rate limiting
- Use proper user agents
- Handle redirects
- Validate input data
2. **Content Processing**
- Clean and normalize data
- Handle encoding issues
- Implement fallback strategies
- Cache processed results
3. **Storage Management**
- Use transactions
- Implement data validation
- Handle concurrent access
- Maintain data integrity
## Future Enhancements
1. **Performance Optimizations**
- Implement parallel processing
- Add caching layer
- Optimize database queries
- Enhance error recovery
2. **Feature Additions**
- Content performance tracking
- Automated content planning
- Enhanced competitive intelligence
- Advanced topic clustering
3. **Integration Improvements**
- API endpoints
- Export capabilities
- Data visualization
- Progress tracking
4. **UI/UX Enhancements**
- Interactive visualizations
- Real-time progress updates
- Export interfaces
- Customization options
## Contributing
When contributing to these utility modules:
1. Follow the existing code structure
2. Add comprehensive error handling
3. Include unit tests
4. Update documentation
5. Follow PEP 8 style guide
## Dependencies
- BeautifulSoup4: HTML parsing
- NLTK: Natural language processing
- SQLAlchemy: Database operations
- Streamlit: UI components
- Requests: HTTP requests
## License
This project is licensed under the MIT License - see the LICENSE file for details.

View File

@@ -0,0 +1,13 @@
"""
Utility modules for content gap analysis.
"""
from .data_collector import DataCollector
from .content_parser import ContentParser
from .ai_processor import AIProcessor
__all__ = [
'DataCollector',
'ContentParser',
'AIProcessor'
]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,236 @@
"""
Content parser utility for analyzing website content structure.
"""
from typing import Dict, Any, List
import re
from bs4 import BeautifulSoup
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from collections import Counter
class ContentParser:
"""Parser for analyzing website content structure."""
def __init__(self):
"""Initialize the content parser."""
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt')
try:
nltk.data.find('corpora/stopwords')
except LookupError:
nltk.download('stopwords')
self.stop_words = set(stopwords.words('english'))
def parse_structure(self, content: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse and analyze the structure of website content.
Args:
content: Dictionary containing website content
Returns:
Dictionary containing parsed content structure
"""
try:
# Parse main content
main_content = content.get('main_content', '')
soup = BeautifulSoup(content.get('html', ''), 'html.parser')
# Extract text statistics
text_stats = self._analyze_text(main_content)
# Extract content sections
sections = self._extract_sections(soup)
# Extract topics
topics = self._extract_topics(main_content)
# Analyze readability
readability = self._analyze_readability(main_content)
# Analyze content hierarchy
hierarchy = self._analyze_hierarchy(content.get('headings', []))
return {
'text_statistics': text_stats,
'sections': sections,
'topics': topics,
'readability': readability,
'hierarchy': hierarchy,
'metadata': content.get('metadata', {})
}
except Exception as e:
return {
'error': str(e),
'text_statistics': {},
'sections': [],
'topics': [],
'readability': {},
'hierarchy': {},
'metadata': {}
}
def _analyze_text(self, text: str) -> Dict[str, Any]:
"""Analyze text statistics."""
sentences = sent_tokenize(text)
words = word_tokenize(text.lower())
words = [w for w in words if w.isalnum() and w not in self.stop_words]
return {
'word_count': len(words),
'sentence_count': len(sentences),
'average_sentence_length': len(words) / max(len(sentences), 1),
'unique_words': len(set(words)),
'stop_words': len([w for w in word_tokenize(text.lower()) if w in self.stop_words]),
'characters': len(text),
'paragraphs': len(text.split('\n\n')),
'sentences': sentences
}
def _extract_sections(self, soup: BeautifulSoup) -> List[Dict[str, Any]]:
"""Extract content sections."""
sections = []
# Find main content containers
containers = soup.find_all(['article', 'section', 'div'], class_=re.compile(r'content|main|article|section'))
for container in containers:
# Get section heading
heading = container.find(['h1', 'h2', 'h3'])
heading_text = heading.get_text().strip() if heading else 'Untitled Section'
# Get section content
content = container.get_text().strip()
# Get section type
section_type = container.name
if container.get('class'):
section_type = ' '.join(container.get('class'))
sections.append({
'heading': heading_text,
'content': content,
'type': section_type,
'word_count': len(word_tokenize(content)),
'position': self._get_element_position(container)
})
return sections
def _extract_topics(self, text: str) -> List[Dict[str, Any]]:
"""Extract main topics from content."""
# Tokenize and clean text
words = word_tokenize(text.lower())
words = [w for w in words if w.isalnum() and w not in self.stop_words]
# Get word frequencies
word_freq = Counter(words)
# Get top topics
topics = []
for word, freq in word_freq.most_common(10):
topics.append({
'topic': word,
'frequency': freq,
'percentage': freq / len(words) * 100
})
return topics
def _analyze_readability(self, text: str) -> Dict[str, float]:
"""Analyze text readability."""
sentences = sent_tokenize(text)
words = word_tokenize(text.lower())
words = [w for w in words if w.isalnum()]
# Calculate average sentence length
avg_sentence_length = len(words) / max(len(sentences), 1)
# Calculate average word length
avg_word_length = sum(len(w) for w in words) / max(len(words), 1)
# Calculate Flesch Reading Ease score
# Formula: 206.835 - 1.015(total words/total sentences) - 84.6(total syllables/total words)
syllables = sum(self._count_syllables(w) for w in words)
flesch_score = 206.835 - 1.015 * avg_sentence_length - 84.6 * (syllables / max(len(words), 1))
return {
'flesch_score': max(0, min(100, flesch_score)),
'avg_sentence_length': avg_sentence_length,
'avg_word_length': avg_word_length,
'syllables_per_word': syllables / max(len(words), 1)
}
def _analyze_hierarchy(self, headings: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze content hierarchy."""
# Group headings by level
heading_levels = {}
for heading in headings:
level = heading['level']
if level not in heading_levels:
heading_levels[level] = []
heading_levels[level].append(heading)
# Calculate hierarchy metrics
total_headings = len(headings)
max_depth = max(int(level[1]) for level in heading_levels.keys()) if heading_levels else 0
return {
'total_headings': total_headings,
'max_depth': max_depth,
'heading_distribution': {level: len(headings) for level, headings in heading_levels.items()},
'has_proper_hierarchy': self._check_proper_hierarchy(heading_levels)
}
def _check_proper_hierarchy(self, heading_levels: Dict[str, List[Dict[str, Any]]]) -> bool:
"""Check if headings follow proper hierarchy."""
if not heading_levels:
return False
# Check if h1 exists
if 'h1' not in heading_levels:
return False
# Check if h1 is unique
if len(heading_levels['h1']) > 1:
return False
# Check if levels are sequential
levels = sorted(int(level[1]) for level in heading_levels.keys())
return all(levels[i] - levels[i-1] <= 1 for i in range(1, len(levels)))
def _count_syllables(self, word: str) -> int:
"""Count syllables in a word."""
word = word.lower()
count = 0
vowels = 'aeiouy'
word = word.lower()
if word[0] in vowels:
count += 1
for index in range(1, len(word)):
if word[index] in vowels and word[index - 1] not in vowels:
count += 1
if word.endswith('e'):
count -= 1
if count == 0:
count += 1
return count
def _get_element_position(self, element) -> Dict[str, int]:
"""Get element position in the document."""
try:
return {
'top': element.sourceline,
'left': element.sourcepos
}
except:
return {
'top': 0,
'left': 0
}

View File

@@ -0,0 +1,112 @@
"""
Data collector utility for content gap analysis.
"""
import requests
from bs4 import BeautifulSoup
from typing import Dict, Any
class DataCollector:
"""
Collects and processes website data for analysis.
"""
def __init__(self):
"""Initialize the data collector."""
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def collect(self, url: str) -> Dict[str, Any]:
"""
Collect website data for analysis.
Args:
url (str): The URL to collect data from
Returns:
dict: Collected website data
"""
try:
# Fetch webpage content
response = requests.get(url, headers=self.headers)
response.raise_for_status()
# Parse HTML content
soup = BeautifulSoup(response.text, 'html.parser')
# Extract relevant data
data = {
'url': url,
'title': self._extract_title(soup),
'meta_description': self._extract_meta_description(soup),
'headings': self._extract_headings(soup),
'content': self._extract_content(soup),
'links': self._extract_links(soup),
'images': self._extract_images(soup)
}
return data
except Exception as e:
return {
'error': str(e),
'url': url
}
def _extract_title(self, soup: BeautifulSoup) -> str:
"""Extract page title."""
title = soup.find('title')
return title.text if title else ''
def _extract_meta_description(self, soup: BeautifulSoup) -> str:
"""Extract meta description."""
meta = soup.find('meta', attrs={'name': 'description'})
return meta.get('content', '') if meta else ''
def _extract_headings(self, soup: BeautifulSoup) -> Dict[str, list]:
"""Extract all headings."""
headings = {}
for i in range(1, 7):
tags = soup.find_all(f'h{i}')
headings[f'h{i}'] = [tag.text.strip() for tag in tags]
return headings
def _extract_content(self, soup: BeautifulSoup) -> str:
"""Extract main content."""
# Remove script and style elements
for script in soup(['script', 'style']):
script.decompose()
# Get text content
text = soup.get_text()
# Clean up text
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
return text
def _extract_links(self, soup: BeautifulSoup) -> list:
"""Extract all links."""
links = []
for link in soup.find_all('a'):
href = link.get('href')
if href:
links.append({
'url': href,
'text': link.text.strip()
})
return links
def _extract_images(self, soup: BeautifulSoup) -> list:
"""Extract all images."""
images = []
for img in soup.find_all('img'):
images.append({
'src': img.get('src', ''),
'alt': img.get('alt', ''),
'title': img.get('title', '')
})
return images

View File

@@ -0,0 +1,237 @@
"""
SEO analyzer utility for content gap analysis.
"""
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
import re
from typing import Dict, Any, List, Optional
from ....utils.website_analyzer.analyzer import WebsiteAnalyzer
def analyze_onpage_seo(url: str) -> Dict[str, Any]:
"""
Analyze on-page SEO elements of a website.
Args:
url: The URL to analyze
Returns:
Dictionary containing SEO analysis results
"""
try:
# Use the combined website analyzer
analyzer = WebsiteAnalyzer()
analysis = analyzer.analyze_website(url)
if not analysis.get('success', False):
return {
'error': analysis.get('error', 'Unknown error in SEO analysis'),
'meta_title': '',
'meta_description': '',
'has_robots_txt': False,
'has_sitemap': False,
'mobile_friendly': False,
'load_time': 0
}
# Extract relevant information from the analysis
seo_info = analysis['data']['analysis']['seo_info']
basic_info = analysis['data']['analysis']['basic_info']
performance = analysis['data']['analysis']['performance']
return {
'meta_tags': seo_info.get('meta_tags', {}),
'content': seo_info.get('content', {}),
'meta_title': basic_info.get('title', ''),
'meta_description': basic_info.get('meta_description', ''),
'has_robots_txt': bool(basic_info.get('robots_txt')),
'has_sitemap': bool(basic_info.get('sitemap')),
'mobile_friendly': True, # This would need to be implemented separately
'load_time': performance.get('load_time', 0)
}
except Exception as e:
return {
'error': str(e),
'meta_title': '',
'meta_description': '',
'has_robots_txt': False,
'has_sitemap': False,
'mobile_friendly': False,
'load_time': 0
}
def _analyze_meta_tags(soup: BeautifulSoup) -> Dict[str, Any]:
"""Analyze meta tags of the webpage."""
meta_tags = {}
# Title tag
title_tag = soup.find('title')
if title_tag:
meta_tags['title'] = title_tag.string.strip()
# Meta description
meta_desc = soup.find('meta', {'name': 'description'})
if meta_desc:
meta_tags['description'] = meta_desc.get('content', '').strip()
# Meta keywords
meta_keywords = soup.find('meta', {'name': 'keywords'})
if meta_keywords:
meta_tags['keywords'] = meta_keywords.get('content', '').strip()
# Open Graph tags
og_tags = {}
for tag in soup.find_all('meta', property=re.compile(r'^og:')):
og_tags[tag['property']] = tag.get('content', '')
meta_tags['og_tags'] = og_tags
# Twitter Card tags
twitter_tags = {}
for tag in soup.find_all('meta', name=re.compile(r'^twitter:')):
twitter_tags[tag['name']] = tag.get('content', '')
meta_tags['twitter_tags'] = twitter_tags
return meta_tags
def _analyze_headings(soup: BeautifulSoup) -> Dict[str, Any]:
"""Analyze heading structure of the webpage."""
headings = {
'h1': [],
'h2': [],
'h3': [],
'h4': [],
'h5': [],
'h6': []
}
for tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
for heading in soup.find_all(tag):
headings[tag].append(heading.get_text().strip())
return headings
def _analyze_content(soup: BeautifulSoup) -> Dict[str, Any]:
"""Analyze main content of the webpage."""
# Find main content
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile(r'content|main|article'))
if not main_content:
return {
'word_count': 0,
'paragraph_count': 0,
'content': ''
}
# Get text content
content = main_content.get_text()
# Count words and paragraphs
words = content.split()
paragraphs = main_content.find_all('p')
return {
'word_count': len(words),
'paragraph_count': len(paragraphs),
'content': content
}
def _analyze_links(soup: BeautifulSoup, base_url: str) -> Dict[str, Any]:
"""Analyze links on the webpage."""
links = {
'internal': [],
'external': [],
'broken': []
}
base_domain = urlparse(base_url).netloc
for link in soup.find_all('a', href=True):
href = link['href']
# Handle relative URLs
if not href.startswith(('http://', 'https://')):
href = urljoin(base_url, href)
# Categorize link
if urlparse(href).netloc == base_domain:
links['internal'].append({
'url': href,
'text': link.get_text().strip(),
'title': link.get('title', '')
})
else:
links['external'].append({
'url': href,
'text': link.get_text().strip(),
'title': link.get('title', '')
})
return links
def _analyze_images(soup: BeautifulSoup) -> Dict[str, Any]:
"""Analyze images on the webpage."""
images = []
for img in soup.find_all('img'):
image_data = {
'src': img.get('src', ''),
'alt': img.get('alt', ''),
'title': img.get('title', ''),
'width': img.get('width', ''),
'height': img.get('height', ''),
'has_alt': bool(img.get('alt')),
'has_title': bool(img.get('title')),
'has_dimensions': bool(img.get('width') and img.get('height'))
}
images.append(image_data)
return {
'total': len(images),
'with_alt': sum(1 for img in images if img['has_alt']),
'with_title': sum(1 for img in images if img['has_title']),
'with_dimensions': sum(1 for img in images if img['has_dimensions']),
'images': images
}
def _check_technical_elements(soup: BeautifulSoup, url: str) -> Dict[str, Any]:
"""Check technical SEO elements."""
base_url = urlparse(url)
domain = base_url.netloc
# Check robots.txt
robots_url = f"{base_url.scheme}://{domain}/robots.txt"
try:
robots_response = requests.get(robots_url, timeout=5)
has_robots_txt = robots_response.status_code == 200
except:
has_robots_txt = False
# Check sitemap
sitemap_url = f"{base_url.scheme}://{domain}/sitemap.xml"
try:
sitemap_response = requests.get(sitemap_url, timeout=5)
has_sitemap = sitemap_response.status_code == 200
except:
has_sitemap = False
# Check mobile friendliness
viewport = soup.find('meta', {'name': 'viewport'})
has_viewport = bool(viewport)
# Check canonical URL
canonical = soup.find('link', {'rel': 'canonical'})
has_canonical = bool(canonical)
# Check language
html_lang = soup.find('html').get('lang', '')
has_language = bool(html_lang)
return {
'has_robots_txt': has_robots_txt,
'has_sitemap': has_sitemap,
'mobile_friendly': has_viewport,
'has_canonical': has_canonical,
'has_language': has_language,
'language': html_lang
}

View File

@@ -0,0 +1,270 @@
"""
Storage module for content gap analysis results.
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
import streamlit as st
class ContentGapAnalysisStorage:
"""Handles storage and retrieval of content gap analysis results."""
def __init__(self, db_session: Session):
"""Initialize the storage handler."""
self.db = db_session
def save_analysis(self, user_id: int, website_url: str, industry: str, results: Dict[str, Any]) -> Optional[int]:
"""
Save content gap analysis results.
Args:
user_id: User ID
website_url: Target website URL
industry: Industry category
results: Analysis results dictionary
Returns:
Analysis ID if successful, None otherwise
"""
try:
# Create main analysis record
analysis = ContentGapAnalysis(
user_id=user_id,
website_url=website_url,
industry=industry,
status='completed',
metadata={'version': '1.0'}
)
self.db.add(analysis)
self.db.flush() # Get the ID without committing
# Save website analysis
website_analysis = WebsiteAnalysis(
content_gap_analysis_id=analysis.id,
content_score=results.get('website', {}).get('content_score', 0),
seo_score=results.get('website', {}).get('seo_score', 0),
structure_score=results.get('website', {}).get('structure_score', 0),
content_metrics=results.get('website', {}).get('content_metrics', {}),
seo_metrics=results.get('website', {}).get('seo_metrics', {}),
technical_metrics=results.get('website', {}).get('technical_metrics', {}),
ai_insights=results.get('website', {}).get('ai_insights', {})
)
self.db.add(website_analysis)
# Save competitor analysis if available
if 'competitors' in results:
for competitor in results['competitors']:
competitor_analysis = CompetitorAnalysis(
content_gap_analysis_id=analysis.id,
competitor_url=competitor.get('url'),
market_position=competitor.get('market_position', {}),
content_gaps=competitor.get('content_gaps', []),
competitive_advantages=competitor.get('competitive_advantages', []),
trend_analysis=competitor.get('trend_analysis', {})
)
self.db.add(competitor_analysis)
# Save keyword analysis
keyword_analysis = KeywordAnalysis(
content_gap_analysis_id=analysis.id,
top_keywords=results.get('keywords', {}).get('top_keywords', []),
search_intent=results.get('keywords', {}).get('search_intent', {}),
opportunities=results.get('keywords', {}).get('opportunities', []),
trend_analysis=results.get('keywords', {}).get('trend_analysis', {})
)
self.db.add(keyword_analysis)
# Save recommendations
for recommendation in results.get('recommendations', []):
content_recommendation = ContentRecommendation(
content_gap_analysis_id=analysis.id,
recommendation_type=recommendation.get('type'),
priority_score=recommendation.get('priority_score', 0),
recommendation=recommendation.get('recommendation', ''),
implementation_steps=recommendation.get('implementation_steps', []),
expected_impact=recommendation.get('expected_impact', {}),
status='pending'
)
self.db.add(content_recommendation)
# Save analysis history
history = AnalysisHistory(
content_gap_analysis_id=analysis.id,
status='completed',
metrics={'duration': results.get('duration', 0)}
)
self.db.add(history)
# Commit all changes
self.db.commit()
return analysis.id
except SQLAlchemyError as e:
self.db.rollback()
st.error(f"Error saving analysis results: {str(e)}")
return None
def get_analysis(self, analysis_id: int) -> Optional[Dict[str, Any]]:
"""
Retrieve content gap analysis results.
Args:
analysis_id: Analysis ID
Returns:
Dictionary containing analysis results if found, None otherwise
"""
try:
analysis = self.db.query(ContentGapAnalysis).get(analysis_id)
if not analysis:
return None
# Get website analysis
website_analysis = self.db.query(WebsiteAnalysis).filter_by(
content_gap_analysis_id=analysis_id
).first()
# Get competitor analysis
competitor_analyses = self.db.query(CompetitorAnalysis).filter_by(
content_gap_analysis_id=analysis_id
).all()
# Get keyword analysis
keyword_analysis = self.db.query(KeywordAnalysis).filter_by(
content_gap_analysis_id=analysis_id
).first()
# Get recommendations
recommendations = self.db.query(ContentRecommendation).filter_by(
content_gap_analysis_id=analysis_id
).all()
# Get analysis history
history = self.db.query(AnalysisHistory).filter_by(
content_gap_analysis_id=analysis_id
).order_by(AnalysisHistory.run_date.desc()).all()
return {
'id': analysis.id,
'website_url': analysis.website_url,
'industry': analysis.industry,
'analysis_date': analysis.analysis_date,
'status': analysis.status,
'website': {
'content_score': website_analysis.content_score,
'seo_score': website_analysis.seo_score,
'structure_score': website_analysis.structure_score,
'content_metrics': website_analysis.content_metrics,
'seo_metrics': website_analysis.seo_metrics,
'technical_metrics': website_analysis.technical_metrics,
'ai_insights': website_analysis.ai_insights
} if website_analysis else {},
'competitors': [{
'url': ca.competitor_url,
'market_position': ca.market_position,
'content_gaps': ca.content_gaps,
'competitive_advantages': ca.competitive_advantages,
'trend_analysis': ca.trend_analysis
} for ca in competitor_analyses],
'keywords': {
'top_keywords': keyword_analysis.top_keywords,
'search_intent': keyword_analysis.search_intent,
'opportunities': keyword_analysis.opportunities,
'trend_analysis': keyword_analysis.trend_analysis
} if keyword_analysis else {},
'recommendations': [{
'type': r.recommendation_type,
'priority_score': r.priority_score,
'recommendation': r.recommendation,
'implementation_steps': r.implementation_steps,
'expected_impact': r.expected_impact,
'status': r.status
} for r in recommendations],
'history': [{
'run_date': h.run_date,
'status': h.status,
'metrics': h.metrics,
'error_log': h.error_log
} for h in history]
}
except SQLAlchemyError as e:
st.error(f"Error retrieving analysis results: {str(e)}")
return None
def get_user_analyses(self, user_id: int) -> List[Dict[str, Any]]:
"""
Get all analyses for a user.
Args:
user_id: User ID
Returns:
List of analysis summaries
"""
try:
analyses = self.db.query(ContentGapAnalysis).filter_by(
user_id=user_id
).order_by(ContentGapAnalysis.analysis_date.desc()).all()
return [{
'id': analysis.id,
'website_url': analysis.website_url,
'industry': analysis.industry,
'analysis_date': analysis.analysis_date,
'status': analysis.status
} for analysis in analyses]
except SQLAlchemyError as e:
st.error(f"Error retrieving user analyses: {str(e)}")
return []
def update_recommendation_status(self, recommendation_id: int, status: str) -> bool:
"""
Update the status of a recommendation.
Args:
recommendation_id: Recommendation ID
status: New status
Returns:
True if successful, False otherwise
"""
try:
recommendation = self.db.query(ContentRecommendation).get(recommendation_id)
if recommendation:
recommendation.status = status
recommendation.updated_at = datetime.utcnow()
self.db.commit()
return True
return False
except SQLAlchemyError as e:
self.db.rollback()
st.error(f"Error updating recommendation status: {str(e)}")
return False
def delete_analysis(self, analysis_id: int) -> bool:
"""
Delete an analysis and all related data.
Args:
analysis_id: Analysis ID
Returns:
True if successful, False otherwise
"""
try:
analysis = self.db.query(ContentGapAnalysis).get(analysis_id)
if analysis:
self.db.delete(analysis)
self.db.commit()
return True
return False
except SQLAlchemyError as e:
self.db.rollback()
st.error(f"Error deleting analysis: {str(e)}")
return False

View File

@@ -0,0 +1,291 @@
"""Website analyzer module for content gap analysis."""
import streamlit as st
from loguru import logger
from typing import Dict, Any, List, Optional
import asyncio
import sys
import os
import json
from lib.utils.website_analyzer.analyzer import WebsiteAnalyzer as BaseWebsiteAnalyzer
from lib.gpt_providers.text_generation.main_text_generation import llm_text_gen
# Configure logger
logger.remove() # Remove default handler
logger.add(
"logs/content_gap_website_analyzer.log",
rotation="50 MB",
retention="10 days",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}"
)
logger.add(
sys.stdout,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>"
)
# Ensure logs directory exists
os.makedirs("logs", exist_ok=True)
class WebsiteAnalyzer(BaseWebsiteAnalyzer):
"""Extended website analyzer for content gap analysis."""
def __init__(self):
"""Initialize the website analyzer."""
super().__init__()
logger.info("ContentGapWebsiteAnalyzer initialized")
def analyze_content_gaps(self, url: str, competitor_urls: List[str]) -> Dict[str, Any]:
"""
Analyze content gaps between the target website and competitors.
Args:
url: The target URL to analyze
competitor_urls: List of competitor URLs to compare against
Returns:
Dictionary containing content gap analysis results
"""
try:
# Analyze target website
target_analysis = self.analyze_website(url)
if not target_analysis.get('success', False):
return {
'error': target_analysis.get('error', 'Unknown error in target analysis'),
'gaps': [],
'recommendations': []
}
# Analyze competitor websites
competitor_analyses = []
for competitor_url in competitor_urls:
analysis = self.analyze_website(competitor_url)
if analysis.get('success', False):
competitor_analyses.append(analysis['data'])
# Generate content gap analysis using AI
prompt = f"""Analyze content gaps between the target website and competitors:
Target Website:
{json.dumps(target_analysis['data'], indent=2)}
Competitor Websites:
{json.dumps(competitor_analyses, indent=2)}
Identify:
1. Missing content topics
2. Content depth differences
3. Keyword gaps
4. Content structure improvements
5. Content quality recommendations
Format the response as JSON with 'gaps' and 'recommendations' keys."""
# Get AI analysis
analysis = llm_text_gen(
prompt=prompt,
system_prompt="You are an SEO expert specializing in content gap analysis.",
response_format="json_object"
)
if not analysis:
return {
'error': 'Failed to generate content gap analysis',
'gaps': [],
'recommendations': []
}
return {
'gaps': analysis.get('gaps', []),
'recommendations': analysis.get('recommendations', [])
}
except Exception as e:
error_msg = f"Error analyzing content gaps: {str(e)}"
logger.error(error_msg, exc_info=True)
return {
'error': error_msg,
'gaps': [],
'recommendations': []
}
def analyze(self, url: str) -> Dict[str, Any]:
"""
Analyze a website for content gaps and SEO opportunities.
Args:
url: The URL to analyze
Returns:
Dictionary containing analysis results
"""
try:
# Initialize progress tracking
progress = {
'status': 'in_progress',
'current_stage': 'content_analysis',
'current_step': 'Initializing analysis',
'progress': 0,
'details': 'Starting website analysis...'
}
self.progress.update(progress)
# Get base website analysis
logger.info("Starting base website analysis")
website_analysis = self.analyze_website(url)
if not website_analysis.get('success', False):
error_msg = website_analysis.get('error', 'Unknown error in website analysis')
logger.error(f"Error in website analysis: {error_msg}")
progress['status'] = 'error'
progress['details'] = error_msg
self.progress.update(progress)
return {
'error': error_msg,
'error_details': website_analysis.get('error_details', {}),
'progress': progress
}
# Extract SEO metrics from the analysis
seo_metrics = self._extract_seo_metrics(website_analysis['data'])
# Extract performance metrics
performance_metrics = self._extract_performance_metrics(website_analysis['data'])
# Update progress
progress['status'] = 'completed'
progress['progress'] = 100
progress['details'] = 'Analysis completed successfully'
self.progress.update(progress)
return {
'success': True,
'data': {
'seo_metrics': seo_metrics,
'performance_metrics': performance_metrics,
'website_analysis': website_analysis['data']
},
'progress': progress
}
except Exception as e:
error_msg = f"Error in content gap analysis: {str(e)}"
logger.error(error_msg, exc_info=True)
progress['status'] = 'error'
progress['details'] = error_msg
self.progress.update(progress)
return {
'error': error_msg,
'error_details': {
'type': type(e).__name__,
'traceback': str(e.__traceback__)
},
'progress': progress
}
def _extract_seo_metrics(self, website_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Extract SEO-related metrics from website analysis."""
try:
seo_info = website_analysis.get('analysis', {}).get('seo_info', {})
return {
'overall_score': seo_info.get('overall_score', 0),
'meta_tags': {
'title': seo_info.get('meta_tags', {}).get('title', {}),
'description': seo_info.get('meta_tags', {}).get('description', {}),
'keywords': seo_info.get('meta_tags', {}).get('keywords', {})
},
'content': {
'word_count': seo_info.get('content', {}).get('word_count', 0),
'readability_score': seo_info.get('content', {}).get('readability_score', 0),
'content_quality_score': seo_info.get('content', {}).get('content_quality_score', 0)
}
}
except Exception as e:
logger.error(f"Error extracting SEO metrics: {str(e)}", exc_info=True)
return {}
def _extract_performance_metrics(self, website_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Extract performance metrics from website analysis."""
try:
performance_info = website_analysis.get('analysis', {}).get('performance', {})
return {
'load_time': performance_info.get('load_time', 0),
'page_size': performance_info.get('page_size', 0),
'resource_count': performance_info.get('resource_count', 0),
'performance_score': performance_info.get('performance_score', 0)
}
except Exception as e:
logger.error(f"Error extracting performance metrics: {str(e)}", exc_info=True)
return {}
def _extract_content_metrics(self, website_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Extract content-related metrics from website analysis."""
try:
content_info = website_analysis['analysis']['content_info']
return {
'word_count': content_info.get('word_count', 0),
'heading_count': content_info.get('heading_count', 0),
'image_count': content_info.get('image_count', 0),
'link_count': content_info.get('link_count', 0),
'has_meta_description': content_info.get('has_meta_description', False),
'has_robots_txt': content_info.get('has_robots_txt', False),
'has_sitemap': content_info.get('has_sitemap', False)
}
except Exception as e:
logger.error(f"Error extracting content metrics: {str(e)}", exc_info=True)
return {}
def _extract_technical_info(self, website_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Extract technical information from website analysis."""
try:
basic_info = website_analysis.get('analysis', {}).get('basic_info', {})
return {
'title': basic_info.get('title', ''),
'meta_description': basic_info.get('meta_description', ''),
'headers': basic_info.get('headers', {}),
'robots_txt': basic_info.get('robots_txt', ''),
'sitemap': basic_info.get('sitemap', ''),
'server_info': basic_info.get('server_info', {}),
'security_info': basic_info.get('security_info', {})
}
except Exception as e:
logger.error(f"Error extracting technical info: {str(e)}", exc_info=True)
return {}
def _generate_insights(self, content_metrics: Dict[str, Any], seo_metrics: Dict[str, Any]) -> List[str]:
"""Generate content insights based on analysis results."""
try:
insights = []
# Content insights
if content_metrics['word_count'] < 300:
insights.append("Content length is below recommended minimum (300 words)")
elif content_metrics['word_count'] > 2000:
insights.append("Content length is above recommended maximum (2000 words)")
if content_metrics['heading_count'] < 2:
insights.append("Content structure could be improved with more headings")
if content_metrics['image_count'] == 0:
insights.append("Consider adding images to improve content engagement")
# SEO insights
if seo_metrics.get('overall_score', 0) < 60:
insights.append("SEO optimization needs significant improvement")
elif seo_metrics.get('overall_score', 0) < 80:
insights.append("SEO optimization has room for improvement")
if not content_metrics['has_meta_description']:
insights.append("Missing meta description - important for SEO")
if not content_metrics['has_robots_txt']:
insights.append("Missing robots.txt - important for search engine crawling")
if not content_metrics['has_sitemap']:
insights.append("Missing sitemap.xml - important for search engine indexing")
return insights
except Exception as e:
logger.error(f"Error generating insights: {str(e)}", exc_info=True)
return []