Files
ALwrity/lib/utils/website_analyzer/analyzer.py

323 lines
12 KiB
Python

"""Website scraping and AI analysis module."""
import asyncio
from typing import Dict, List, Optional
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import streamlit as st
import re
from loguru import logger
from ...web_crawlers.async_web_crawler import AsyncWebCrawlerService
from ...gpt_providers.text_generation.main_text_generation import llm_text_gen
import os
import sys
import logging
import json
from datetime import datetime
import requests
import ssl
import socket
import whois
import dns.resolver
from requests.exceptions import RequestException
from concurrent.futures import ThreadPoolExecutor
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('website_analyzer.log')
]
)
logger = logging.getLogger(__name__)
def analyze_website(url: str) -> Dict:
"""
Analyze a website and return comprehensive results.
Args:
url (str): The URL to analyze
Returns:
Dict: Analysis results including various metrics and checks
"""
logger.info(f"Starting website analysis for URL: {url}")
try:
analyzer = WebsiteAnalyzer()
results = analyzer.analyze_website(url)
# Add success status to results
if "error" in results:
return {
"success": False,
"error": results["error"]
}
# Add success status and wrap results
return {
"success": True,
"data": results
}
except Exception as e:
logger.error(f"Error in analyze_website: {str(e)}", exc_info=True)
return {
"success": False,
"error": str(e)
}
class WebsiteAnalyzer:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
logger.info("WebsiteAnalyzer initialized")
def analyze_website(self, url: str) -> Dict:
"""
Perform comprehensive analysis of a website.
Args:
url (str): The URL to analyze
Returns:
Dict: Analysis results including various metrics and checks
"""
logger.info(f"Starting analysis for URL: {url}")
try:
# Validate URL
if not self._validate_url(url):
logger.error(f"Invalid URL format: {url}")
return {"error": "Invalid URL format"}
# Basic URL parsing
parsed_url = urlparse(url)
domain = parsed_url.netloc
logger.debug(f"Parsed domain: {domain}")
# Initialize results dictionary
results = {
"url": url,
"domain": domain,
"timestamp": datetime.now().isoformat(),
"analysis": {}
}
# Perform various analyses
with ThreadPoolExecutor(max_workers=4) as executor:
# Basic website info
basic_info = executor.submit(self._get_basic_info, url).result()
results["analysis"]["basic_info"] = basic_info
# SSL/TLS info
ssl_info = executor.submit(self._check_ssl, domain).result()
results["analysis"]["ssl_info"] = ssl_info
# DNS info
dns_info = executor.submit(self._check_dns, domain).result()
results["analysis"]["dns_info"] = dns_info
# WHOIS info
whois_info = executor.submit(self._get_whois_info, domain).result()
results["analysis"]["whois_info"] = whois_info
# Content analysis
content_info = executor.submit(self._analyze_content, url).result()
results["analysis"]["content_info"] = content_info
# Performance metrics
performance = executor.submit(self._check_performance, url).result()
results["analysis"]["performance"] = performance
logger.info(f"Analysis completed successfully for {url}")
return results
except Exception as e:
logger.error(f"Error during website analysis: {str(e)}", exc_info=True)
return {"error": str(e)}
def _validate_url(self, url: str) -> bool:
"""Validate URL format."""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception as e:
logger.error(f"URL validation error: {str(e)}")
return False
def _get_basic_info(self, url: str) -> Dict:
"""Get basic website information."""
logger.debug(f"Getting basic info for {url}")
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
return {
"status_code": response.status_code,
"content_type": response.headers.get('content-type', ''),
"title": soup.title.string if soup.title else '',
"meta_description": self._get_meta_description(soup),
"headers": dict(response.headers),
"robots_txt": self._get_robots_txt(url),
"sitemap": self._get_sitemap(url)
}
except Exception as e:
logger.error(f"Error getting basic info: {str(e)}", exc_info=True)
return {"error": str(e)}
def _check_ssl(self, domain: str) -> Dict:
"""Check SSL/TLS certificate information."""
logger.debug(f"Checking SSL for {domain}")
try:
context = ssl.create_default_context()
with socket.create_connection((domain, 443)) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
return {
"has_ssl": True,
"issuer": dict(x[0] for x in cert['issuer']),
"expiry": datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z').isoformat(),
"version": cert['version'],
"subject": dict(x[0] for x in cert['subject'])
}
except Exception as e:
logger.error(f"SSL check error: {str(e)}", exc_info=True)
return {"has_ssl": False, "error": str(e)}
def _check_dns(self, domain: str) -> Dict:
"""Check DNS records."""
logger.debug(f"Checking DNS for {domain}")
try:
records = {}
for record_type in ['A', 'AAAA', 'MX', 'NS', 'TXT']:
try:
answers = dns.resolver.resolve(domain, record_type)
records[record_type] = [str(rdata) for rdata in answers]
except dns.resolver.NoAnswer:
records[record_type] = []
except Exception as e:
logger.warning(f"Error resolving {record_type} record: {str(e)}")
records[record_type] = []
return records
except Exception as e:
logger.error(f"DNS check error: {str(e)}", exc_info=True)
return {"error": str(e)}
def _get_whois_info(self, domain: str) -> Dict:
"""Get WHOIS information for a domain."""
try:
w = whois.whois(domain)
def format_date(date_value):
if isinstance(date_value, list):
return date_value[0].isoformat() if date_value else 'Unknown'
return date_value.isoformat() if date_value else 'Unknown'
return {
'registrar': w.registrar if hasattr(w, 'registrar') else 'Unknown',
'creation_date': format_date(w.creation_date),
'expiration_date': format_date(w.expiration_date),
'updated_date': format_date(w.updated_date) if hasattr(w, 'updated_date') else 'Unknown',
'name_servers': w.name_servers if hasattr(w, 'name_servers') else [],
'domain_name': w.domain_name if hasattr(w, 'domain_name') else domain,
'text': w.text if hasattr(w, 'text') else ''
}
except Exception as e:
logger.error(f"WHOIS check error: {str(e)}")
return {
'registrar': 'Unknown',
'creation_date': 'Unknown',
'expiration_date': 'Unknown',
'updated_date': 'Unknown',
'name_servers': [],
'domain_name': domain,
'text': ''
}
def _analyze_content(self, url: str) -> Dict:
"""Analyze website content."""
logger.debug(f"Analyzing content for {url}")
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Get all text content
text_content = soup.get_text()
# Count words
words = re.findall(r'\w+', text_content.lower())
word_count = len(words)
# Count headings
headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
# Count images
images = soup.find_all('img')
# Count links
links = soup.find_all('a')
return {
"word_count": word_count,
"heading_count": len(headings),
"image_count": len(images),
"link_count": len(links),
"has_meta_description": bool(self._get_meta_description(soup)),
"has_robots_txt": bool(self._get_robots_txt(url)),
"has_sitemap": bool(self._get_sitemap(url))
}
except Exception as e:
logger.error(f"Content analysis error: {str(e)}", exc_info=True)
return {"error": str(e)}
def _check_performance(self, url: str) -> Dict:
"""Check website performance metrics."""
logger.debug(f"Checking performance for {url}")
try:
start_time = datetime.now()
response = self.session.get(url, timeout=10)
end_time = datetime.now()
load_time = (end_time - start_time).total_seconds()
return {
"load_time": load_time,
"status_code": response.status_code,
"content_length": len(response.content),
"headers": dict(response.headers)
}
except Exception as e:
logger.error(f"Performance check error: {str(e)}", exc_info=True)
return {"error": str(e)}
def _get_meta_description(self, soup: BeautifulSoup) -> Optional[str]:
"""Extract meta description from HTML."""
meta_desc = soup.find('meta', attrs={'name': 'description'})
return meta_desc.get('content') if meta_desc else None
def _get_robots_txt(self, url: str) -> Optional[str]:
"""Get robots.txt content."""
try:
robots_url = f"{url.rstrip('/')}/robots.txt"
response = self.session.get(robots_url, timeout=5)
if response.status_code == 200:
return response.text
except Exception as e:
logger.warning(f"Error fetching robots.txt: {str(e)}")
return None
def _get_sitemap(self, url: str) -> Optional[str]:
"""Get sitemap.xml content."""
try:
sitemap_url = f"{url.rstrip('/')}/sitemap.xml"
response = self.session.get(sitemap_url, timeout=5)
if response.status_code == 200:
return response.text
except Exception as e:
logger.warning(f"Error fetching sitemap.xml: {str(e)}")
return None