Files
ALwrity/backend/services/seo_tools/sitemap_service.py

1261 lines
52 KiB
Python

"""
Sitemap Analysis Service
AI-enhanced sitemap analyzer that provides insights into website structure,
content distribution, and publishing patterns for SEO optimization.
"""
import aiohttp
import asyncio
import re
import json
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from loguru import logger
import xml.etree.ElementTree as ET
from urllib.parse import urlparse, urljoin
import pandas as pd
import gzip
from ..llm_providers.main_text_generation import llm_text_gen
from middleware.logging_middleware import seo_logger
class SitemapService:
"""Service for analyzing website sitemaps with AI insights"""
def __init__(self):
"""Initialize the sitemap service"""
self.service_name = "sitemap_analyzer"
logger.info(f"Initialized {self.service_name}")
# Common sitemap paths to check
self.common_sitemap_paths = [
"sitemap.xml",
"sitemap_index.xml",
"sitemap/index.xml",
"sitemap.php",
"sitemap.txt",
"sitemap.xml.gz",
"sitemap1.xml",
# Common CMS/plugin paths
"wp-sitemap.xml", # WordPress 5.5+ default
"post-sitemap.xml",
"page-sitemap.xml",
"product-sitemap.xml", # WooCommerce
"category-sitemap.xml",
# Common feed paths that can act as sitemaps
"rss/",
"rss.xml",
"atom.xml",
]
async def analyze_sitemap(
self,
sitemap_url: str,
analyze_content_trends: bool = True,
analyze_publishing_patterns: bool = True,
include_ai_insights: bool = True,
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Analyze website sitemap for structure and patterns
Args:
sitemap_url: URL of the sitemap to analyze
analyze_content_trends: Whether to analyze content trends
analyze_publishing_patterns: Whether to analyze publishing patterns
Returns:
Dictionary containing sitemap analysis and AI insights
"""
try:
start_time = datetime.utcnow()
if not sitemap_url:
raise ValueError("Sitemap URL is required")
logger.info(f"Analyzing sitemap: {sitemap_url}")
# Fetch and parse sitemap data
sitemap_data = await self._fetch_sitemap_data(sitemap_url)
if not sitemap_data:
raise Exception("Failed to fetch sitemap data")
# Analyze sitemap structure
structure_analysis = self._analyze_sitemap_structure(sitemap_data)
# Analyze content trends if requested
content_trends = {}
if analyze_content_trends and sitemap_data.get("urls"):
content_trends = self._analyze_content_trends(sitemap_data["urls"])
# Analyze publishing patterns if requested
publishing_patterns = {}
if analyze_publishing_patterns and sitemap_data.get("urls"):
publishing_patterns = self._analyze_publishing_patterns(sitemap_data["urls"])
ai_insights = {}
if include_ai_insights:
ai_insights = await self._generate_ai_insights(
structure_analysis, content_trends, publishing_patterns, sitemap_url, user_id=user_id
)
execution_time = (datetime.utcnow() - start_time).total_seconds()
result = {
"sitemap_url": sitemap_url,
"analysis_date": datetime.utcnow().isoformat(),
"total_urls": len(sitemap_data.get("urls", [])),
"structure_analysis": structure_analysis,
"content_trends": content_trends,
"publishing_patterns": publishing_patterns,
"ai_insights": ai_insights,
"seo_recommendations": self._generate_seo_recommendations(
structure_analysis, content_trends, publishing_patterns
),
"execution_time": execution_time
}
# Log the operation
await seo_logger.log_tool_usage(
tool_name=self.service_name,
input_data={
"sitemap_url": sitemap_url,
"analyze_content_trends": analyze_content_trends,
"analyze_publishing_patterns": analyze_publishing_patterns,
"include_ai_insights": include_ai_insights
},
output_data=result,
success=True
)
logger.info(f"Sitemap analysis completed for {sitemap_url}")
return result
except Exception as e:
logger.error(f"Error analyzing sitemap {sitemap_url}: {e}")
# Log the error
await seo_logger.log_tool_usage(
tool_name=self.service_name,
input_data={
"sitemap_url": sitemap_url,
"analyze_content_trends": analyze_content_trends,
"analyze_publishing_patterns": analyze_publishing_patterns
},
output_data={"error": str(e)},
success=False
)
raise
async def _fetch_sitemap_data(self, sitemap_url: str, depth: int = 0, session: aiohttp.ClientSession = None) -> Dict[str, Any]:
"""Fetch and parse sitemap data"""
# Reduced max depth from 3 to 2 to prevent infinite recursion/hanging on massive sites
if depth > 2:
logger.info(f"🛑 Max recursion depth (2) reached for sitemap {sitemap_url}")
return {"urls": [], "sitemaps": [], "total_urls": 0}
# Use passed session or create a new local one if it's the top-level call
local_session = False
if session is None:
local_session = True
# Limit pool size and set strict timeouts
connector = aiohttp.TCPConnector(limit_per_host=5, force_close=True)
# Increased total timeout to 60s for slow sitemaps, but kept connect/read strict
timeout = aiohttp.ClientTimeout(total=60, connect=10, sock_read=30)
session = aiohttp.ClientSession(connector=connector, timeout=timeout)
try:
logger.info(f"🔍 Fetching sitemap: {sitemap_url} (depth={depth})")
# 10MB limit for sitemaps
MAX_SITEMAP_SIZE = 10 * 1024 * 1024
try:
async with session.get(sitemap_url) as response:
if response.status != 200:
raise Exception(f"Failed to fetch sitemap: HTTP {response.status}")
# Check Content-Type header
content_type = response.headers.get("Content-Type", "").lower()
if "text/html" in content_type:
raise Exception("URL returned a webpage (HTML), not a valid XML sitemap")
# Check Content-Length header if available
content_length = response.headers.get("Content-Length")
if content_length and int(content_length) > MAX_SITEMAP_SIZE:
raise Exception(f"Sitemap too large: {content_length} bytes")
# Read with size limit (safe read)
raw = await response.content.read(MAX_SITEMAP_SIZE + 1)
if len(raw) > MAX_SITEMAP_SIZE:
raise Exception(f"Sitemap size exceeds limit of {MAX_SITEMAP_SIZE} bytes")
if sitemap_url.lower().endswith(".gz") or (len(raw) >= 2 and raw[0] == 0x1F and raw[1] == 0x8B):
try:
raw = gzip.decompress(raw)
except Exception:
pass
try:
content = raw.decode(response.charset or "utf-8", errors="replace")
except Exception:
content = raw.decode("utf-8", errors="replace")
content_stripped = content.lstrip()
if not content_stripped.startswith("<"):
urls = []
# Limit text sitemaps to 50k lines
lines = content.splitlines()[:50000]
for line in lines:
line_clean = (line or "").strip()
if not line_clean or line_clean.startswith("#"):
continue
if line_clean.startswith("http://") or line_clean.startswith("https://"):
urls.append({"loc": line_clean})
return {
"urls": urls,
"sitemaps": [],
"total_urls": len(urls)
}
# Check for HTML content disguised as XML
if content.strip().lower().startswith(("<!doctype html", "<html")):
raise Exception("URL returned a webpage (HTML), not a valid XML sitemap")
# Use defusedxml for safety if available, otherwise standard ET
try:
import defusedxml.ElementTree as DET
root = DET.fromstring(content)
except ImportError:
root = ET.fromstring(content)
# Handle different sitemap formats
urls = []
sitemaps = []
# Check if it's a sitemap index
if root.tag.endswith('sitemapindex'):
# Extract nested sitemaps
for sitemap in root:
if sitemap.tag.endswith('sitemap'):
loc = sitemap.find('.//{http://www.sitemaps.org/schemas/sitemap/0.9}loc')
if loc is not None:
sitemaps.append(loc.text)
# Fetch and parse nested sitemaps in parallel
nested_tasks = []
# Reduced nested limit from 10 to 5 to prevent fan-out explosion
for nested_url in sitemaps[:5]:
nested_tasks.append(self._fetch_sitemap_data(nested_url, depth + 1, session))
if nested_tasks:
nested_results = await asyncio.gather(*nested_tasks, return_exceptions=True)
for res in nested_results:
if isinstance(res, Exception):
logger.warning(f"Failed to fetch nested sitemap: {res}")
elif isinstance(res, dict):
urls.extend(res.get("urls", []))
else:
# Regular sitemap with URLs
# Limit to first 10k URLs per sitemap file to prevent memory issues
url_count = 0
for url_element in root:
if url_count >= 10000:
break
if url_element.tag.endswith('url'):
url_data = {}
for child in url_element:
tag_name = child.tag.split('}')[-1] # Remove namespace
url_data[tag_name] = child.text
if 'loc' in url_data:
urls.append(url_data)
url_count += 1
return {
"urls": urls,
"sitemaps": sitemaps,
"total_urls": len(urls)
}
except Exception as e:
# Re-raise to be caught by outer try/except
raise e
except ET.ParseError as e:
# Check if content is empty
if not content or not content.strip():
logger.warning(f"Sitemap is empty: {sitemap_url}")
return {"urls": [], "sitemaps": [], "total_urls": 0}
# Check if content looks like HTML to give a better error message
try:
if "content" in locals() and ("<html" in content.lower() or "<body" in content.lower() or "<div" in content.lower()):
raise Exception("URL returned a webpage (HTML), not a valid XML sitemap")
except Exception:
pass
logger.warning(f"Failed to parse sitemap XML: {e}")
raise Exception(f"Failed to parse sitemap XML: {e}")
except Exception as e:
if "no element found" in str(e) or "not a valid XML sitemap" in str(e):
logger.warning(f"⚠️ Sitemap parsing failed for {sitemap_url}: {e}")
else:
logger.error(f"Error fetching sitemap data for {sitemap_url}: {e}")
raise
finally:
# Only close the session if we created it
if local_session and session:
await session.close()
def _analyze_sitemap_structure(self, sitemap_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze the structure of the sitemap"""
urls = sitemap_data.get("urls", [])
if not urls:
return {"error": "No URLs found in sitemap"}
# Analyze URL patterns
url_patterns = {}
file_types = {}
path_levels = []
for url_info in urls:
url = url_info.get("loc", "")
parsed_url = urlparse(url)
# Analyze path patterns
path_parts = parsed_url.path.strip('/').split('/')
path_levels.append(len(path_parts))
# Categorize by first path segment
if len(path_parts) > 0 and path_parts[0]:
category = path_parts[0]
url_patterns[category] = url_patterns.get(category, 0) + 1
# Analyze file types
if '.' in parsed_url.path:
extension = parsed_url.path.split('.')[-1].lower()
file_types[extension] = file_types.get(extension, 0) + 1
# Calculate statistics
avg_path_depth = sum(path_levels) / len(path_levels) if path_levels else 0
# Enhancement: Keyword Clustering & Strategic Pillar Mapping
keyword_clusters = self._cluster_keywords_from_urls(urls)
strategic_pillars = self._map_strategic_pillars(urls)
return {
"total_urls": len(urls),
"url_patterns": dict(sorted(url_patterns.items(), key=lambda x: x[1], reverse=True)[:10]),
"file_types": dict(sorted(file_types.items(), key=lambda x: x[1], reverse=True)),
"average_path_depth": round(avg_path_depth, 2),
"max_path_depth": max(path_levels) if path_levels else 0,
"keyword_clusters": keyword_clusters,
"strategic_pillars": strategic_pillars,
"structure_quality": self._assess_structure_quality(url_patterns, avg_path_depth)
}
def _cluster_keywords_from_urls(self, urls: List[Dict[str, Any]]) -> Dict[str, int]:
"""Extract and cluster keywords from URL slugs to identify content strategy focus."""
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'with', 'by', 'of', 'from', 'category', 'tag', 'blog', 'posts', 'archive'}
keywords: Dict[str, int] = {}
for u in urls[:1000]: # Sample 1000 for performance
path = urlparse(u.get('loc', '')).path
# Split by non-alphanumeric and underscores
parts = re.split(r'[^a-zA-Z0-9]', path)
for part in parts:
p = part.lower()
if len(p) > 3 and p not in stop_words and not p.isdigit():
keywords[p] = keywords.get(p, 0) + 1
# Return top 15 clusters
return dict(sorted(keywords.items(), key=lambda x: x[1], reverse=True)[:15])
def _map_strategic_pillars(self, urls: List[Dict[str, Any]]) -> Dict[str, int]:
"""Categorize URLs into strategic content pillars based on common path patterns."""
pillars = {
"Educational": ["blog", "guides", "how-to", "learn", "academy", "resource", "documentation", "docs"],
"Transactional": ["product", "features", "pricing", "plans", "solutions", "buy", "checkout", "cart"],
"Comparison": ["vs", "alternative", "comparison", "reviews", "best-of"],
"Company": ["about", "careers", "press", "contact", "team", "legal", "privacy", "terms"],
"Tools": ["calculator", "tool", "generator", "checker", "analyzer"]
}
results = {k: 0 for k in pillars}
for u in urls:
loc = u.get('loc', '').lower()
found = False
for pillar, tokens in pillars.items():
if any(token in loc for token in tokens):
results[pillar] += 1
found = True
break
# Optional: Add "Other" category if needed
return results
def _analyze_content_trends(self, urls: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze content publishing trends"""
# Extract dates from lastmod
dates = []
for url_info in urls:
lastmod = url_info.get("lastmod")
if lastmod:
try:
# Parse various date formats
date_str = lastmod.split('T')[0] # Remove time component
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
dates.append(date_obj)
except ValueError:
continue
if not dates:
return {"message": "No valid dates found for trend analysis"}
# Analyze trends
dates.sort()
# Monthly distribution
monthly_counts = {}
yearly_counts = {}
for date in dates:
month_key = date.strftime("%Y-%m")
year_key = date.strftime("%Y")
monthly_counts[month_key] = monthly_counts.get(month_key, 0) + 1
yearly_counts[year_key] = yearly_counts.get(year_key, 0) + 1
# Calculate publishing velocity
date_range = (dates[-1] - dates[0]).days
publishing_velocity = len(dates) / max(date_range, 1) if date_range > 0 else 0
return {
"date_range": {
"earliest": dates[0].isoformat(),
"latest": dates[-1].isoformat(),
"span_days": date_range
},
"monthly_distribution": dict(sorted(monthly_counts.items())[-12:]), # Last 12 months
"yearly_distribution": yearly_counts,
"publishing_velocity": round(publishing_velocity, 3),
"total_dated_urls": len(dates),
"trends": self._identify_publishing_trends(monthly_counts)
}
def _analyze_publishing_patterns(self, urls: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze publishing patterns and frequency"""
# Extract and analyze priority and changefreq
priority_distribution = {}
changefreq_distribution = {}
for url_info in urls:
priority = url_info.get("priority")
if priority:
try:
priority_float = float(priority)
priority_range = f"{int(priority_float * 10)}/10"
priority_distribution[priority_range] = priority_distribution.get(priority_range, 0) + 1
except ValueError:
pass
changefreq = url_info.get("changefreq")
if changefreq:
changefreq_distribution[changefreq] = changefreq_distribution.get(changefreq, 0) + 1
return {
"priority_distribution": priority_distribution,
"changefreq_distribution": changefreq_distribution,
"optimization_opportunities": self._identify_optimization_opportunities(
priority_distribution, changefreq_distribution, len(urls)
)
}
async def analyze_sitemap_for_onboarding(
self,
sitemap_url: str,
user_url: str,
competitors: List[str] = None,
industry_context: str = None,
analyze_content_trends: bool = True,
analyze_publishing_patterns: bool = True,
include_ai_insights: bool = True,
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""Enhanced sitemap analysis specifically for onboarding Step 3 competitive analysis"""
try:
# Run standard sitemap analysis
analysis_result = await self.analyze_sitemap(
sitemap_url=sitemap_url,
analyze_content_trends=analyze_content_trends,
analyze_publishing_patterns=analyze_publishing_patterns,
include_ai_insights=include_ai_insights,
user_id=user_id
)
# Enhance with onboarding-specific insights
onboarding_insights = await self._generate_onboarding_insights(
analysis_result,
user_url,
competitors,
industry_context,
user_id=user_id
)
# Combine results
analysis_result["onboarding_insights"] = onboarding_insights
analysis_result["user_url"] = user_url
analysis_result["industry_context"] = industry_context
analysis_result["competitors_analyzed"] = competitors or []
return analysis_result
except Exception as e:
logger.error(f"Error in onboarding sitemap analysis: {e}")
return {
"error": str(e),
"success": False
}
async def _generate_onboarding_insights(
self,
analysis_result: Dict[str, Any],
user_url: str,
competitors: List[str] = None,
industry_context: str = None,
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""Generate onboarding-specific insights for competitive analysis"""
try:
structure_analysis = analysis_result.get("structure_analysis", {})
content_trends = analysis_result.get("content_trends", {})
publishing_patterns = analysis_result.get("publishing_patterns", {})
# Build onboarding-specific prompt
prompt = self._build_onboarding_analysis_prompt(
structure_analysis, content_trends, publishing_patterns,
user_url, competitors, industry_context
)
# Define JSON schema for structured output
json_struct = {
"type": "object",
"properties": {
"competitive_positioning": {"type": "string"},
"content_gaps": {
"type": "array",
"items": {"type": "string"}
},
"growth_opportunities": {
"type": "array",
"items": {"type": "string"}
},
"industry_benchmarks": {
"type": "array",
"items": {"type": "string"}
},
"strategic_recommendations": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["competitive_positioning", "content_gaps", "growth_opportunities", "industry_benchmarks", "strategic_recommendations"]
}
# Generate AI insights
ai_response = llm_text_gen(
prompt=prompt,
system_prompt=self._get_onboarding_system_prompt(),
json_struct=json_struct,
user_id=user_id
)
# Parse and structure insights
insights = self._parse_onboarding_insights(ai_response)
# Log AI analysis
await seo_logger.log_ai_analysis(
tool_name=f"{self.service_name}_onboarding",
prompt=prompt,
response=ai_response if isinstance(ai_response, str) else str(ai_response),
model_used="gemini-2.0-flash-001"
)
return insights
except Exception as e:
logger.error(f"Error generating onboarding insights: {e}")
return {
"competitive_positioning": "Analysis unavailable",
"content_gaps": [],
"growth_opportunities": [],
"industry_benchmarks": []
}
async def _generate_ai_insights(
self,
structure_analysis: Dict[str, Any],
content_trends: Dict[str, Any],
publishing_patterns: Dict[str, Any],
sitemap_url: str,
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""Generate AI-powered insights for sitemap analysis"""
try:
# Build prompt with analysis data
prompt = self._build_ai_analysis_prompt(
structure_analysis, content_trends, publishing_patterns, sitemap_url
)
# Generate AI insights
ai_response = llm_text_gen(
prompt=prompt,
system_prompt=self._get_system_prompt(),
user_id=user_id
)
# Parse and structure insights
insights = self._parse_ai_insights(ai_response)
# Log AI analysis
await seo_logger.log_ai_analysis(
tool_name=self.service_name,
prompt=prompt,
response=ai_response,
model_used="gemini-2.0-flash-001"
)
return insights
except Exception as e:
logger.error(f"Error generating AI insights: {e}")
return {
"summary": "AI analysis unavailable",
"content_strategy": [],
"seo_opportunities": [],
"technical_recommendations": []
}
def _build_ai_analysis_prompt(
self,
structure_analysis: Dict[str, Any],
content_trends: Dict[str, Any],
publishing_patterns: Dict[str, Any],
sitemap_url: str
) -> str:
"""Build AI prompt for sitemap analysis"""
total_urls = structure_analysis.get("total_urls", 0)
url_patterns = structure_analysis.get("url_patterns", {})
avg_depth = structure_analysis.get("average_path_depth", 0)
publishing_velocity = content_trends.get("publishing_velocity", 0)
date_range = content_trends.get("date_range", {})
prompt = f"""
Analyze this website sitemap data and provide strategic insights for content creators and digital marketers:
Sitemap URL: {sitemap_url}
Total URLs: {total_urls}
Average Path Depth: {avg_depth}
Publishing Velocity: {publishing_velocity} posts/day
URL Patterns (top categories):
{chr(10).join([f"- {category}: {count} URLs" for category, count in list(url_patterns.items())[:5]])}
Content Timeline:
- Date Range: {date_range.get('span_days', 0)} days
- Publishing Rate: {publishing_velocity:.2f} pages per day
Please provide:
1. Content Strategy Insights (opportunities for new content categories)
2. SEO Structure Assessment (how well the site is organized for search engines)
3. Publishing Pattern Analysis (content frequency and consistency)
4. Growth Recommendations (specific actions for content expansion)
5. Technical SEO Opportunities (sitemap optimization suggestions)
Focus on actionable insights for content creators and digital marketing professionals.
"""
return prompt
def _get_system_prompt(self) -> str:
"""Get system prompt for AI analysis"""
return """You are an SEO and content strategy expert specializing in website structure analysis.
Your audience includes content creators, digital marketers, and solopreneurs who need to understand how their site structure impacts SEO and content performance.
Provide practical, actionable insights that help users:
- Optimize their content strategy
- Improve site structure for SEO
- Identify content gaps and opportunities
- Plan future content development
Always explain the business impact of your recommendations.
"""
def _parse_ai_insights(self, ai_response: str) -> Dict[str, Any]:
"""Parse AI response into structured insights"""
insights = {
"summary": "",
"content_strategy": [],
"seo_opportunities": [],
"technical_recommendations": [],
"growth_recommendations": []
}
try:
# Split into sections and parse
sections = ai_response.split('\n\n')
for section in sections:
section = section.strip()
if not section:
continue
if 'content strategy' in section.lower():
insights["content_strategy"] = self._extract_list_items(section)
elif 'seo' in section.lower() and 'opportunities' in section.lower():
insights["seo_opportunities"] = self._extract_list_items(section)
elif 'technical' in section.lower():
insights["technical_recommendations"] = self._extract_list_items(section)
elif 'growth' in section.lower() or 'recommendations' in section.lower():
insights["growth_recommendations"] = self._extract_list_items(section)
elif 'analysis' in section.lower() or 'assessment' in section.lower():
insights["summary"] = self._extract_content(section)
# Fallback
if not any(insights.values()):
insights["summary"] = ai_response[:300] + "..." if len(ai_response) > 300 else ai_response
except Exception as e:
logger.error(f"Error parsing AI insights: {e}")
insights["summary"] = "AI analysis completed but parsing failed"
return insights
def _extract_content(self, section: str) -> str:
"""Extract content from a section"""
lines = section.split('\n')
content_lines = []
for line in lines:
line = line.strip()
if line and not line.endswith(':') and not line.startswith('#'):
content_lines.append(line)
return ' '.join(content_lines)
def _extract_list_items(self, section: str) -> List[str]:
"""Extract list items from a section"""
items = []
lines = section.split('\n')
for line in lines:
line = line.strip()
if line and (line.startswith('-') or line.startswith('*') or
(line[0].isdigit() and '.' in line[:3])):
clean_line = line.lstrip('-*0123456789. ').strip()
if clean_line:
items.append(clean_line)
return items[:5]
def _assess_structure_quality(self, url_patterns: Dict[str, int], avg_depth: float) -> str:
"""Assess the quality of site structure"""
if avg_depth < 2:
return "Shallow structure - may lack content organization"
elif avg_depth > 5:
return "Deep structure - may hurt crawlability"
elif len(url_patterns) < 3:
return "Limited content categories - opportunity for expansion"
else:
return "Well-structured site with good organization"
def _identify_publishing_trends(self, monthly_counts: Dict[str, int]) -> List[str]:
"""Identify publishing trends from monthly data"""
trends = []
if not monthly_counts or len(monthly_counts) < 3:
return ["Insufficient data for trend analysis"]
# Get recent months
recent_months = list(monthly_counts.values())[-6:] # Last 6 months
if len(recent_months) >= 3:
# Check for growth trend
if recent_months[-1] > recent_months[-3]:
trends.append("Increasing publishing frequency")
elif recent_months[-1] < recent_months[-3]:
trends.append("Decreasing publishing frequency")
# Check consistency
avg_posts = sum(recent_months) / len(recent_months)
if max(recent_months) - min(recent_months) <= avg_posts * 0.5:
trends.append("Consistent publishing schedule")
else:
trends.append("Irregular publishing pattern")
return trends or ["Stable publishing pattern"]
def _identify_optimization_opportunities(
self,
priority_dist: Dict[str, int],
changefreq_dist: Dict[str, int],
total_urls: int
) -> List[str]:
"""Identify sitemap optimization opportunities"""
opportunities = []
# Check if priorities are being used
if not priority_dist:
opportunities.append("Add priority values to sitemap URLs")
# Check if changefreq is being used
if not changefreq_dist:
opportunities.append("Add changefreq values to sitemap URLs")
# Check for overuse of high priority
high_priority_count = priority_dist.get("10/10", 0) + priority_dist.get("9/10", 0)
if high_priority_count > total_urls * 0.3:
opportunities.append("Reduce number of high-priority pages (max 30%)")
return opportunities or ["Sitemap is well-optimized"]
def _generate_seo_recommendations(
self,
structure_analysis: Dict[str, Any],
content_trends: Dict[str, Any],
publishing_patterns: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Generate specific SEO recommendations"""
recommendations = []
# Structure recommendations
total_urls = structure_analysis.get("total_urls", 0)
avg_depth = structure_analysis.get("average_path_depth", 0)
if avg_depth > 4:
recommendations.append({
"category": "Site Structure",
"priority": "High",
"recommendation": "Reduce URL depth to improve crawlability",
"impact": "Better search engine indexing"
})
if total_urls > 50000:
recommendations.append({
"category": "Sitemap Management",
"priority": "Medium",
"recommendation": "Split large sitemap into smaller files",
"impact": "Improved crawl efficiency"
})
# Content recommendations
publishing_velocity = content_trends.get("publishing_velocity", 0)
if publishing_velocity < 0.1: # Less than 1 post per 10 days
recommendations.append({
"category": "Content Strategy",
"priority": "High",
"recommendation": "Increase content publishing frequency",
"impact": "Better search visibility and freshness signals"
})
return recommendations
async def health_check(self) -> Dict[str, Any]:
"""Health check for the sitemap service"""
try:
# Test with a simple sitemap
test_url = "https://www.google.com/sitemap.xml"
result = await self.analyze_sitemap(
sitemap_url=test_url,
analyze_content_trends=False,
analyze_publishing_patterns=False,
include_ai_insights=False
)
return {
"status": "operational",
"service": self.service_name,
"test_passed": bool(result.get("total_urls", 0) > 0),
"last_check": datetime.utcnow().isoformat()
}
except Exception as e:
return {
"status": "error",
"service": self.service_name,
"error": str(e),
"last_check": datetime.utcnow().isoformat()
}
def _build_onboarding_analysis_prompt(
self,
structure_analysis: Dict[str, Any],
content_trends: Dict[str, Any],
publishing_patterns: Dict[str, Any],
user_url: str,
competitors: List[str] = None,
industry_context: str = None
) -> str:
"""Build AI prompt for onboarding-specific sitemap analysis"""
total_urls = structure_analysis.get("total_urls", 0)
url_patterns = structure_analysis.get("url_patterns", {})
avg_depth = structure_analysis.get("average_path_depth", 0)
publishing_velocity = content_trends.get("publishing_velocity", 0)
competitor_info = ""
if competitors:
competitor_info = f"\nCompetitors to consider: {', '.join(competitors)}"
industry_info = ""
if industry_context:
industry_info = f"\nIndustry Context: {industry_context}"
prompt = f"""
Analyze this website's sitemap for competitive positioning and content strategy insights:
USER WEBSITE: {user_url}
Total URLs: {total_urls}
Average Path Depth: {avg_depth}
Publishing Velocity: {publishing_velocity:.2f} posts/day
{industry_info}{competitor_info}
URL Structure Analysis:
{chr(10).join([f"- {category}: {count} URLs" for category, count in list(url_patterns.items())[:8]])}
Content Publishing Patterns:
- Publishing Rate: {publishing_velocity:.2f} pages per day
- Content Categories: {len(url_patterns)} main categories identified
Please provide competitive analysis insights focusing on the following sections:
1. **COMPETITIVE POSITIONING**: How does this site's content structure compare to industry standards? (Provide a brief paragraph)
2. **CONTENT GAPS**: What content categories or topics are missing based on the URL structure? (List 3-5 specific gaps)
3. **GROWTH OPPORTUNITIES**: Specific content expansion opportunities to compete better (List 3-5 opportunities)
4. **INDUSTRY BENCHMARKS**: How does publishing frequency and content depth compare to competitors? (List 3 key comparisons)
5. **STRATEGIC RECOMMENDATIONS**: 3-5 actionable steps for content strategy improvement
Focus on actionable insights that help content creators understand their competitive position and identify growth opportunities.
"""
return prompt
def _get_onboarding_system_prompt(self) -> str:
"""Get system prompt for onboarding sitemap analysis"""
return """You are a competitive intelligence and content strategy expert specializing in website structure analysis for content creators and digital marketers.
Your role is to analyze website sitemaps and provide strategic insights that help users understand their competitive position and identify content opportunities.
Key focus areas:
- Competitive positioning analysis
- Content gap identification
- Growth opportunity recommendations
- Industry benchmarking insights
- Actionable strategic recommendations
Provide practical, data-driven insights that help content creators make informed decisions about their content strategy and competitive positioning.
Format your response as structured insights that can be easily parsed and displayed in a user interface."""
def _parse_onboarding_insights(self, ai_response: Any) -> Dict[str, Any]:
"""Parse AI response for onboarding-specific insights"""
try:
insights = {}
# If it's already a dict (structured output), use it
if isinstance(ai_response, dict):
insights = ai_response
elif isinstance(ai_response, str):
# Try to parse JSON string
try:
insights = json.loads(ai_response)
except json.JSONDecodeError:
# Try to extract JSON from markdown block
json_match = re.search(r'```json\s*(.*?)\s*```', ai_response, re.DOTALL)
if json_match:
try:
insights = json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
# Ensure all required keys exist
required_keys = [
"competitive_positioning",
"content_gaps",
"growth_opportunities",
"industry_benchmarks",
"strategic_recommendations"
]
# Validate and fill missing keys
validated_insights = {
"competitive_positioning": insights.get("competitive_positioning", "Analysis in progress..."),
"content_gaps": insights.get("content_gaps", []),
"growth_opportunities": insights.get("growth_opportunities", []),
"industry_benchmarks": insights.get("industry_benchmarks", []),
"strategic_recommendations": insights.get("strategic_recommendations", [])
}
# Ensure lists are actually lists
for key in required_keys[1:]:
if not isinstance(validated_insights[key], list):
if isinstance(validated_insights[key], str):
validated_insights[key] = [validated_insights[key]]
else:
validated_insights[key] = []
return validated_insights
except Exception as e:
logger.error(f"Error parsing onboarding insights: {e}")
return {
"competitive_positioning": "Analysis unavailable",
"content_gaps": [],
"growth_opportunities": [],
"industry_benchmarks": [],
"strategic_recommendations": []
}
async def discover_sitemap_url(self, website_url: str) -> Optional[str]:
"""
Intelligently discover the sitemap URL for a given website.
Args:
website_url: The website URL to find sitemap for
Returns:
The discovered sitemap URL or None if not found
"""
try:
# Ensure the URL has a proper scheme
if not urlparse(website_url).scheme:
base_url = f"https://{website_url}"
else:
base_url = website_url.rstrip('/')
logger.info(f"Discovering sitemap for: {base_url}")
# Method 1: Check robots.txt first (most reliable)
sitemap_url = await self._find_sitemap_in_robots_txt(base_url)
if sitemap_url:
logger.info(f"Found sitemap via robots.txt: {sitemap_url}")
return sitemap_url
# Method 2: Check common paths
sitemap_url = await self._find_sitemap_by_common_paths(base_url)
if sitemap_url:
logger.info(f"Found sitemap via common paths: {sitemap_url}")
return sitemap_url
logger.warning(f"No sitemap found for {base_url}")
return None
except Exception as e:
logger.error(f"Error discovering sitemap for {website_url}: {e}")
return None
async def _find_sitemap_on_homepage(self, base_url: str) -> Optional[str]:
"""
Check homepage for sitemap links in HTML.
Args:
base_url: Base URL of the website
Returns:
Sitemap URL if found on homepage, None otherwise
"""
try:
logger.debug(f"Checking homepage for sitemap links: {base_url}")
async with aiohttp.ClientSession() as session:
async with session.get(base_url, timeout=aiohttp.ClientTimeout(total=15), headers={"User-Agent": "ALwrity-SEO-Bot/1.0"}) as response:
if response.status == 200:
content = await response.text()
# Look for sitemap links in href attributes
# Matches: href="...sitemap.xml..." or href='...sitemap.xml...'
# Simple regex to catch common variations
sitemap_matches = re.findall(r'href=["\']([^"\']*[sS]itemap[^"\']*\.xml[^"\']*)["\']', content)
for match in sitemap_matches:
potential_url = match.strip()
# Handle relative URLs
if not potential_url.startswith(('http://', 'https://')):
potential_url = urljoin(base_url, potential_url)
logger.debug(f"Found potential sitemap link on homepage: {potential_url}")
# Verify accessibility
if await self._check_sitemap_url(potential_url, "homepage link"):
return potential_url
return None
except Exception as e:
logger.debug(f"Error checking homepage for sitemap: {e}")
return None
async def _find_sitemap_in_robots_txt(self, base_url: str) -> Optional[str]:
"""
Check robots.txt for sitemap directives.
Args:
base_url: Base URL of the website
Returns:
Sitemap URL if found in robots.txt, None otherwise
"""
try:
robots_url = urljoin(base_url, "/robots.txt")
logger.debug(f"Checking robots.txt at: {robots_url}")
async with aiohttp.ClientSession() as session:
async with session.get(robots_url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
content = await response.text()
# Look for sitemap directives (case-insensitive)
sitemap_matches = re.findall(r'^Sitemap:\s*(.+)', content, re.IGNORECASE | re.MULTILINE)
if sitemap_matches:
sitemap_url = sitemap_matches[0].strip()
logger.debug(f"Found sitemap directive in robots.txt: {sitemap_url}")
# Verify the sitemap URL is accessible
if await self._verify_sitemap_url(sitemap_url):
return sitemap_url
else:
logger.warning(f"robots.txt points to inaccessible sitemap: {sitemap_url}")
logger.debug("No sitemap directive found in robots.txt")
else:
logger.debug(f"robots.txt returned HTTP {response.status}")
except Exception as e:
logger.debug(f"Error checking robots.txt: {e}")
return None
async def _find_sitemap_by_common_paths(self, base_url: str) -> Optional[str]:
"""
Check common sitemap paths.
Args:
base_url: Base URL of the website
Returns:
Sitemap URL if found at common paths, None otherwise
"""
try:
logger.debug(f"Checking common sitemap paths for: {base_url}")
# Check paths in parallel for better performance
tasks = []
for path in self.common_sitemap_paths:
full_url = urljoin(base_url, path)
tasks.append(self._check_sitemap_url(full_url, f"common path: /{path}"))
# Wait for all checks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Return the first successful result
for result in results:
if isinstance(result, str) and result:
return result
logger.debug("No sitemap found at common paths")
except Exception as e:
logger.debug(f"Error checking common paths: {e}")
return None
async def _check_sitemap_url(self, url: str, method: str) -> Optional[str]:
"""
Check if a URL is a valid sitemap.
Args:
url: URL to check
method: Method description for logging
Returns:
URL if valid sitemap, None otherwise
"""
try:
headers = {
'User-Agent': 'ALwritySitemapBot/1.0 (https://alwrity.com)',
'Accept': 'application/xml, text/xml, */*'
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
content_type = response.headers.get('Content-Type', '').lower()
# Check if it's a valid sitemap content type
if any(xml_type in content_type for xml_type in ['xml', 'text', 'application/x-gzip']):
logger.debug(f"Found valid sitemap via {method}: {url} (Content-Type: {content_type})")
return url
else:
# Still consider it if it's 200 but not typical content type
logger.debug(f"Found potential sitemap via {method}: {url} (Content-Type: {content_type})")
return url
elif response.status == 404:
# Skip 404s silently
pass
else:
logger.debug(f"HTTP {response.status} for {url} via {method}")
except Exception as e:
# Skip connection errors silently
logger.debug(f"Connection error for {url}: {e}")
return None
async def _verify_sitemap_url(self, url: str) -> bool:
"""
Verify that a sitemap URL is accessible and returns valid content.
Args:
url: Sitemap URL to verify
Returns:
True if accessible, False otherwise
"""
try:
headers = {
'User-Agent': 'ALwritySitemapBot/1.0 (https://alwrity.com)',
'Accept': 'application/xml, text/xml, */*'
}
async with aiohttp.ClientSession() as session:
async with session.head(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response:
return response.status == 200
except Exception:
return False