"""Website scraping and AI analysis module.""" import asyncio from typing import Dict, List, Optional from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse import streamlit as st import re from loguru import logger from ...web_crawlers.async_web_crawler import AsyncWebCrawlerService from ...gpt_providers.text_generation.main_text_generation import llm_text_gen import os import sys import logging import json from datetime import datetime import requests import ssl import socket import whois import dns.resolver from requests.exceptions import RequestException from concurrent.futures import ThreadPoolExecutor # Configure logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('website_analyzer.log') ] ) logger = logging.getLogger(__name__) def analyze_website(url: str) -> Dict: """ Analyze a website and return comprehensive results. Args: url (str): The URL to analyze Returns: Dict: Analysis results including various metrics and checks """ logger.info(f"Starting website analysis for URL: {url}") try: analyzer = WebsiteAnalyzer() results = analyzer.analyze_website(url) # Add success status to results if "error" in results: return { "success": False, "error": results["error"] } # Add success status and wrap results return { "success": True, "data": results } except Exception as e: logger.error(f"Error in analyze_website: {str(e)}", exc_info=True) return { "success": False, "error": str(e) } class WebsiteAnalyzer: def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) logger.info("WebsiteAnalyzer initialized") def analyze_website(self, url: str) -> Dict: """ Perform comprehensive analysis of a website. Args: url (str): The URL to analyze Returns: Dict: Analysis results including various metrics and checks """ logger.info(f"Starting analysis for URL: {url}") try: # Validate URL if not self._validate_url(url): logger.error(f"Invalid URL format: {url}") return {"error": "Invalid URL format"} # Basic URL parsing parsed_url = urlparse(url) domain = parsed_url.netloc logger.debug(f"Parsed domain: {domain}") # Initialize results dictionary results = { "url": url, "domain": domain, "timestamp": datetime.now().isoformat(), "analysis": {} } # Perform various analyses with ThreadPoolExecutor(max_workers=4) as executor: # Basic website info basic_info = executor.submit(self._get_basic_info, url).result() results["analysis"]["basic_info"] = basic_info # SSL/TLS info ssl_info = executor.submit(self._check_ssl, domain).result() results["analysis"]["ssl_info"] = ssl_info # DNS info dns_info = executor.submit(self._check_dns, domain).result() results["analysis"]["dns_info"] = dns_info # WHOIS info whois_info = executor.submit(self._get_whois_info, domain).result() results["analysis"]["whois_info"] = whois_info # Content analysis content_info = executor.submit(self._analyze_content, url).result() results["analysis"]["content_info"] = content_info # Performance metrics performance = executor.submit(self._check_performance, url).result() results["analysis"]["performance"] = performance logger.info(f"Analysis completed successfully for {url}") return results except Exception as e: logger.error(f"Error during website analysis: {str(e)}", exc_info=True) return {"error": str(e)} def _validate_url(self, url: str) -> bool: """Validate URL format.""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception as e: logger.error(f"URL validation error: {str(e)}") return False def _get_basic_info(self, url: str) -> Dict: """Get basic website information.""" logger.debug(f"Getting basic info for {url}") try: response = self.session.get(url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') return { "status_code": response.status_code, "content_type": response.headers.get('content-type', ''), "title": soup.title.string if soup.title else '', "meta_description": self._get_meta_description(soup), "headers": dict(response.headers), "robots_txt": self._get_robots_txt(url), "sitemap": self._get_sitemap(url) } except Exception as e: logger.error(f"Error getting basic info: {str(e)}", exc_info=True) return {"error": str(e)} def _check_ssl(self, domain: str) -> Dict: """Check SSL/TLS certificate information.""" logger.debug(f"Checking SSL for {domain}") try: context = ssl.create_default_context() with socket.create_connection((domain, 443)) as sock: with context.wrap_socket(sock, server_hostname=domain) as ssock: cert = ssock.getpeercert() return { "has_ssl": True, "issuer": dict(x[0] for x in cert['issuer']), "expiry": datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z').isoformat(), "version": cert['version'], "subject": dict(x[0] for x in cert['subject']) } except Exception as e: logger.error(f"SSL check error: {str(e)}", exc_info=True) return {"has_ssl": False, "error": str(e)} def _check_dns(self, domain: str) -> Dict: """Check DNS records.""" logger.debug(f"Checking DNS for {domain}") try: records = {} for record_type in ['A', 'AAAA', 'MX', 'NS', 'TXT']: try: answers = dns.resolver.resolve(domain, record_type) records[record_type] = [str(rdata) for rdata in answers] except dns.resolver.NoAnswer: records[record_type] = [] except Exception as e: logger.warning(f"Error resolving {record_type} record: {str(e)}") records[record_type] = [] return records except Exception as e: logger.error(f"DNS check error: {str(e)}", exc_info=True) return {"error": str(e)} def _get_whois_info(self, domain: str) -> Dict: """Get WHOIS information for a domain.""" try: w = whois.whois(domain) def format_date(date_value): if isinstance(date_value, list): return date_value[0].isoformat() if date_value else 'Unknown' return date_value.isoformat() if date_value else 'Unknown' return { 'registrar': w.registrar if hasattr(w, 'registrar') else 'Unknown', 'creation_date': format_date(w.creation_date), 'expiration_date': format_date(w.expiration_date), 'updated_date': format_date(w.updated_date) if hasattr(w, 'updated_date') else 'Unknown', 'name_servers': w.name_servers if hasattr(w, 'name_servers') else [], 'domain_name': w.domain_name if hasattr(w, 'domain_name') else domain, 'text': w.text if hasattr(w, 'text') else '' } except Exception as e: logger.error(f"WHOIS check error: {str(e)}") return { 'registrar': 'Unknown', 'creation_date': 'Unknown', 'expiration_date': 'Unknown', 'updated_date': 'Unknown', 'name_servers': [], 'domain_name': domain, 'text': '' } def _analyze_content(self, url: str) -> Dict: """Analyze website content.""" logger.debug(f"Analyzing content for {url}") try: response = self.session.get(url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Get all text content text_content = soup.get_text() # Count words words = re.findall(r'\w+', text_content.lower()) word_count = len(words) # Count headings headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']) # Count images images = soup.find_all('img') # Count links links = soup.find_all('a') return { "word_count": word_count, "heading_count": len(headings), "image_count": len(images), "link_count": len(links), "has_meta_description": bool(self._get_meta_description(soup)), "has_robots_txt": bool(self._get_robots_txt(url)), "has_sitemap": bool(self._get_sitemap(url)) } except Exception as e: logger.error(f"Content analysis error: {str(e)}", exc_info=True) return {"error": str(e)} def _check_performance(self, url: str) -> Dict: """Check website performance metrics.""" logger.debug(f"Checking performance for {url}") try: start_time = datetime.now() response = self.session.get(url, timeout=10) end_time = datetime.now() load_time = (end_time - start_time).total_seconds() return { "load_time": load_time, "status_code": response.status_code, "content_length": len(response.content), "headers": dict(response.headers) } except Exception as e: logger.error(f"Performance check error: {str(e)}", exc_info=True) return {"error": str(e)} def _get_meta_description(self, soup: BeautifulSoup) -> Optional[str]: """Extract meta description from HTML.""" meta_desc = soup.find('meta', attrs={'name': 'description'}) return meta_desc.get('content') if meta_desc else None def _get_robots_txt(self, url: str) -> Optional[str]: """Get robots.txt content.""" try: robots_url = f"{url.rstrip('/')}/robots.txt" response = self.session.get(robots_url, timeout=5) if response.status_code == 200: return response.text except Exception as e: logger.warning(f"Error fetching robots.txt: {str(e)}") return None def _get_sitemap(self, url: str) -> Optional[str]: """Get sitemap.xml content.""" try: sitemap_url = f"{url.rstrip('/')}/sitemap.xml" response = self.session.get(sitemap_url, timeout=5) if response.status_code == 200: return response.text except Exception as e: logger.warning(f"Error fetching sitemap.xml: {str(e)}") return None