""" SEO Dashboard Service Main orchestration service that coordinates data fetching from GSC, Bing, and other analytics sources for the SEO dashboard. Leverages existing OAuth connections from onboarding step 5. """ from typing import Dict, Any, Optional, List, Type from datetime import datetime, timedelta from sqlalchemy.orm import Session from sqlalchemy import func from loguru import logger from utils.logger_utils import get_service_logger from services.gsc_service import GSCService from services.integrations.bing_oauth import BingOAuthService from services.bing_analytics_storage_service import BingAnalyticsStorageService from services.analytics_cache_service import AnalyticsCacheService from api.content_planning.services.content_strategy.onboarding.data_integration import OnboardingDataIntegrationService from .analytics_aggregator import AnalyticsAggregator from .competitive_analyzer import CompetitiveAnalyzer from models.onboarding import SEOPageAudit, WebsiteAnalysis, OnboardingSession from models.website_analysis_monitoring_models import ( OnboardingFullWebsiteAnalysisTask, OnboardingFullWebsiteAnalysisExecutionLog, DeepCompetitorAnalysisTask, DeepCompetitorAnalysisExecutionLog, SIFIndexingTask, SIFIndexingExecutionLog, MarketTrendsTask, MarketTrendsExecutionLog, ) from models.advertools_monitoring_models import AdvertoolsTask logger = get_service_logger("seo_dashboard") class SEODashboardService: """Main service for SEO dashboard data orchestration.""" def __init__(self, db: Session): """Initialize the SEO dashboard service.""" self.db = db self.gsc_service = GSCService() self.bing_oauth = BingOAuthService() # Bing storage is initialized per-user dynamically self.analytics_cache = AnalyticsCacheService() self.integration_service = OnboardingDataIntegrationService() self.analytics_aggregator = AnalyticsAggregator() self.competitive_analyzer = CompetitiveAnalyzer(db) def _get_bing_storage(self, user_id: str) -> BingAnalyticsStorageService: """Get Bing storage service for user.""" from services.database import get_user_db_path db_path = get_user_db_path(user_id) db_url = f"sqlite:///{db_path}" return BingAnalyticsStorageService(db_url) async def get_platform_status(self, user_id: str) -> Dict[str, Any]: """Get connection status for GSC and Bing platforms.""" try: # Check GSC connection gsc_credentials = self.gsc_service.load_user_credentials(user_id) gsc_connected = gsc_credentials is not None # Check Bing connection with detailed status bing_token_status = self.bing_oauth.get_user_token_status(user_id) bing_connected = bing_token_status.get('has_active_tokens', False) # Get cached data for last sync info gsc_data = self.analytics_cache.get('gsc_analytics', user_id) bing_data = self.analytics_cache.get('bing_analytics', user_id) return { "gsc": { "connected": gsc_connected, "sites": self._get_gsc_sites(user_id) if gsc_connected else [], "last_sync": gsc_data.get('last_updated') if gsc_data else None, "status": "connected" if gsc_connected else "disconnected" }, "bing": { "connected": bing_connected, "sites": self._get_bing_sites(user_id) if bing_connected else [], "last_sync": bing_data.get('last_updated') if bing_data else None, "status": "connected" if bing_connected else ("expired" if bing_token_status.get('has_expired_tokens') else "disconnected"), "has_expired_tokens": bing_token_status.get('has_expired_tokens', False), "last_token_date": bing_token_status.get('last_token_date'), "total_tokens": bing_token_status.get('total_tokens', 0) } } except Exception as e: logger.error(f"Error getting platform status for user {user_id}: {e}") return { "gsc": {"connected": False, "sites": [], "last_sync": None, "status": "error"}, "bing": {"connected": False, "sites": [], "last_sync": None, "status": "error"} } async def get_dashboard_overview(self, user_id: str, site_url: Optional[str] = None) -> Dict[str, Any]: """Get comprehensive dashboard overview with real GSC/Bing data.""" try: # Get user's website URL if not provided if not site_url: # Use SSOT for onboarding data onboarding_data = await self.integration_service.process_onboarding_data(user_id, self.db) website_analysis = onboarding_data.get('website_analysis', {}) if website_analysis and website_analysis.get('website_url'): site_url = website_analysis['website_url'] else: # Fallback: try to get from Bing sites bing_sites = self._get_bing_sites(user_id) if bing_sites: site_url = bing_sites[0] # Use first Bing site else: site_url = 'https://alwrity.com' # Default fallback # Get platform status platform_status = await self.get_platform_status(user_id) # Get analytics data gsc_data = await self.get_gsc_data(user_id, site_url) bing_data = await self.get_bing_data(user_id, site_url) # Aggregate metrics summary = self.analytics_aggregator.combine_metrics(gsc_data, bing_data) timeseries = self.analytics_aggregator.normalize_timeseries( gsc_data.get("timeseries", []), bing_data.get("timeseries", []) ) # Get competitive insights competitor_insights = await self.competitive_analyzer.get_competitive_insights(user_id) # Calculate health score health_score = self._calculate_health_score(summary, platform_status) # Generate AI insights ai_insights = await self._generate_ai_insights(summary, timeseries, competitor_insights) technical_seo_audit = self._get_technical_seo_audit_overview(user_id, site_url) advertools_insights = self._get_advertools_insights(user_id, site_url) return { "website_url": site_url, "platforms": platform_status, "summary": summary, "timeseries": timeseries, "competitor_insights": competitor_insights, "health_score": health_score, "ai_insights": ai_insights, "technical_seo_audit": technical_seo_audit, "advertools_insights": advertools_insights, "last_updated": datetime.now().isoformat() } except Exception as e: logger.error(f"Error getting dashboard overview for user {user_id}: {e}") raise def _get_technical_seo_audit_overview(self, user_id: str, site_url: str) -> Dict[str, Any]: site_key = (site_url or "").rstrip("/") try: q = self.db.query(SEOPageAudit).filter(SEOPageAudit.user_id == str(user_id)) if site_key: q = q.filter(SEOPageAudit.website_url.like(f"{site_key}%")) audits = q.order_by(func.coalesce(SEOPageAudit.overall_score, 1000).asc()).all() pages_audited = len(audits) scores = [a.overall_score for a in audits if isinstance(a.overall_score, int)] avg_score = round(sum(scores) / len(scores)) if scores else 0 fix_scheduled_pages = len([a for a in audits if a.status == 'fix_scheduled']) worst_pages = [ { "page_url": a.page_url, "overall_score": a.overall_score, "status": a.status, "issues_count": len(a.issues or []) if isinstance(a.issues, list) else 0 } for a in audits[:10] ] task = self.db.query(OnboardingFullWebsiteAnalysisTask).filter( OnboardingFullWebsiteAnalysisTask.user_id == str(user_id), OnboardingFullWebsiteAnalysisTask.website_url.like(f"{site_key}%") ).order_by(OnboardingFullWebsiteAnalysisTask.updated_at.desc()).first() task_status = None next_execution = None failure_pattern = None if task: task_status = task.status next_execution = task.next_execution.isoformat() if task.next_execution else None failure_pattern = task.failure_pattern return { "status": "ready" if pages_audited > 0 else ("scheduled" if task_status == "active" else "pending"), "task_status": task_status, "next_execution": next_execution, "failure_pattern": failure_pattern, "pages_audited": pages_audited, "avg_score": avg_score, "fix_scheduled_pages": fix_scheduled_pages, "worst_pages": worst_pages } except Exception as e: logger.warning(f"Failed to build technical SEO audit overview for user {user_id}: {e}") return { "status": "error", "error": str(e), "pages_audited": 0, "avg_score": 0, "fix_scheduled_pages": 0, "worst_pages": [] } async def get_onboarding_scheduled_task_health( self, user_id: str, site_url: Optional[str] = None, ) -> Dict[str, Any]: """Return consolidated health for all onboarding scheduled SEO jobs.""" site_key = (site_url or "").rstrip("/") task_matrix = { "OnboardingFullWebsiteAnalysisTask": { "label": "Onboarding Full Website Analysis", "task_model": OnboardingFullWebsiteAnalysisTask, "log_model": OnboardingFullWebsiteAnalysisExecutionLog, }, "DeepCompetitorAnalysisTask": { "label": "Deep Competitor Analysis", "task_model": DeepCompetitorAnalysisTask, "log_model": DeepCompetitorAnalysisExecutionLog, }, "SIFIndexingTask": { "label": "SIF Indexing", "task_model": SIFIndexingTask, "log_model": SIFIndexingExecutionLog, }, "MarketTrendsTask": { "label": "Market Trends", "task_model": MarketTrendsTask, "log_model": MarketTrendsExecutionLog, }, } task_health: Dict[str, Any] = {} for task_name, config in task_matrix.items(): task_health[task_name] = self._get_single_task_health( user_id=user_id, task_model=config["task_model"], log_model=config["log_model"], label=config["label"], site_key=site_key, ) return { "status": "ok", "website_url": site_key or None, "tasks": task_health, "last_updated": datetime.utcnow().isoformat(), } def _get_single_task_health( self, user_id: str, task_model: Type[Any], log_model: Type[Any], label: str, site_key: str, ) -> Dict[str, Any]: query = self.db.query(task_model).filter(task_model.user_id == str(user_id)) if site_key: query = query.filter(task_model.website_url.like(f"{site_key}%")) task = query.order_by(task_model.updated_at.desc()).first() if not task: return { "label": label, "status": "not_scheduled", "next_execution": None, "last_success": None, "last_failure": None, "consecutive_failures": 0, "latest_execution": None, } latest_log = ( self.db.query(log_model) .filter(log_model.task_id == task.id) .order_by(log_model.execution_date.desc()) .first() ) log_summary = None if latest_log: log_summary = { "status": latest_log.status, "execution_date": latest_log.execution_date.isoformat() if latest_log.execution_date else None, "execution_time_ms": latest_log.execution_time_ms, "error_message": (latest_log.error_message or "")[:500] if latest_log.error_message else None, "result_summary": self._summarize_execution_result(latest_log.result_data), } return { "label": label, "status": task.status or "not_scheduled", "next_execution": task.next_execution.isoformat() if task.next_execution else None, "last_success": task.last_success.isoformat() if task.last_success else None, "last_failure": task.last_failure.isoformat() if task.last_failure else None, "consecutive_failures": task.consecutive_failures or 0, "latest_execution": log_summary, } def _summarize_execution_result(self, result_data: Any) -> Optional[str]: if not isinstance(result_data, dict): return None for key in ("summary", "message", "status_message", "note"): value = result_data.get(key) if isinstance(value, str) and value.strip(): return value[:300] if result_data: return f"Result keys: {', '.join(sorted(result_data.keys())[:6])}" return None async def get_gsc_data(self, user_id: str, site_url: Optional[str] = None) -> Dict[str, Any]: """Get GSC data for the specified site.""" try: # Check if user has GSC credentials credentials = self.gsc_service.load_user_credentials(user_id) if not credentials: return {"error": "GSC not connected", "data": [], "status": "disconnected"} # Try to get from cache first cache_key = f"gsc_analytics:{user_id}:{site_url or 'default'}" cached_data = self.analytics_cache.get('gsc_analytics', user_id, site_url=site_url or 'default') if cached_data: return cached_data # Fetch fresh data from GSC API if site_url: gsc_data = self.gsc_service.get_search_analytics(user_id, site_url) else: # Get all sites for user sites = self._get_gsc_sites(user_id) if sites: gsc_data = self.gsc_service.get_search_analytics(user_id, sites[0]) else: return {"error": "No GSC sites found", "data": [], "status": "disconnected"} # Cache the data self.analytics_cache.set('gsc_analytics', user_id, gsc_data, ttl_override=3600, site_url=site_url or 'default') # 1 hour cache return gsc_data except Exception as e: logger.error(f"Error getting GSC data for user {user_id}: {e}") return {"error": str(e), "data": [], "status": "error"} async def get_bing_data(self, user_id: str, site_url: Optional[str] = None) -> Dict[str, Any]: """Get Bing Webmaster Tools data for the specified site.""" try: # Check if user has Bing tokens tokens = self.bing_oauth.get_user_tokens(user_id) if not tokens: return {"error": "Bing not connected", "data": [], "status": "disconnected"} # Try to get from cache first cache_key = f"bing_analytics:{user_id}:{site_url or 'default'}" cached_data = self.analytics_cache.get('bing_analytics', user_id, site_url=site_url or 'default') if cached_data: return cached_data # Get data from Bing storage service if site_url: bing_storage = self._get_bing_storage(user_id) bing_data = bing_storage.get_analytics_summary(user_id, site_url, days=30) else: # Get all sites for user sites = self._get_bing_sites(user_id) if sites: logger.info(f"Using first Bing site for analysis: {sites[0]}") bing_storage = self._get_bing_storage(user_id) bing_data = bing_storage.get_analytics_summary(user_id, sites[0], days=30) else: logger.warning(f"No Bing sites found for user {user_id}") return {"error": "No Bing sites found", "data": [], "status": "disconnected"} # Cache the data self.analytics_cache.set('bing_analytics', user_id, bing_data, ttl_override=3600, site_url=site_url or 'default') # 1 hour cache return bing_data except Exception as e: logger.error(f"Error getting Bing data for user {user_id}: {e}") return {"error": str(e), "data": [], "status": "error"} async def get_competitive_insights(self, user_id: str) -> Dict[str, Any]: """Get competitive insights from onboarding step 3 data.""" try: return await self.competitive_analyzer.get_competitive_insights(user_id) except Exception as e: logger.error(f"Error getting competitive insights for user {user_id}: {e}") return { "competitor_keywords": [], "content_gaps": [], "opportunity_score": 0 } async def refresh_analytics_data(self, user_id: str, site_url: Optional[str] = None) -> Dict[str, Any]: """Refresh analytics data by invalidating cache and fetching fresh data.""" try: # Invalidate cache cache_keys = [ f"gsc_analytics:{user_id}", f"bing_analytics:{user_id}", f"gsc_analytics:{user_id}:{site_url or 'default'}", f"bing_analytics:{user_id}:{site_url or 'default'}" ] for key in cache_keys: self.analytics_cache.delete(key) # Fetch fresh data gsc_result = await self.get_gsc_data(user_id, site_url) bing_result = await self.get_bing_data(user_id, site_url) return { "status": "success", "message": "Analytics data refreshed successfully", "last_updated": datetime.now().isoformat(), "platforms": { "gsc": {"status": "success" if "error" not in gsc_result else "error"}, "bing": {"status": "success" if "error" not in bing_result else "error"} } } except Exception as e: logger.error(f"Error refreshing analytics data for user {user_id}: {e}") return { "status": "error", "message": f"Failed to refresh analytics data: {str(e)}", "last_updated": datetime.now().isoformat() } def _get_advertools_insights(self, user_id: str, site_url: str) -> Dict[str, Any]: """Fetch Advertools-based insights from WebsiteAnalysis and AdvertoolsTasks.""" try: # 1. Get augmented persona themes from WebsiteAnalysis session = self.db.query(OnboardingSession).filter(OnboardingSession.user_id == user_id).first() if not session: return {} analysis = self.db.query(WebsiteAnalysis).filter(WebsiteAnalysis.session_id == session.id).first() # 2. Get latest tasks status tasks = self.db.query(AdvertoolsTask).filter(AdvertoolsTask.user_id == user_id).all() audit_status = "pending" health_status = "pending" for task in tasks: t_type = task.payload.get('type') if task.payload else None if t_type == 'content_audit': audit_status = task.status elif t_type == 'site_health': health_status = task.status brand_analysis = analysis.brand_analysis or {} if analysis else {} seo_audit = analysis.seo_audit or {} if analysis else {} return { "augmented_themes": brand_analysis.get('augmented_themes', []), "last_audit": brand_analysis.get('last_advertools_audit'), "site_health": seo_audit.get('site_health', {}), "last_health_check": seo_audit.get('last_advertools_health_check'), "tasks": { "content_audit": audit_status, "site_health": health_status } } except Exception as e: logger.warning(f"Failed to fetch Advertools insights for user {user_id}: {e}") return {} def _get_gsc_sites(self, user_id: str) -> List[str]: """Get GSC sites for user.""" try: credentials = self.gsc_service.load_user_credentials(user_id) if not credentials: return [] # This would need to be implemented in GSCService # For now, return empty list return [] except Exception as e: logger.error(f"Error getting GSC sites for user {user_id}: {e}") return [] def _get_bing_sites(self, user_id: str) -> List[str]: """Get Bing sites for user.""" try: # Use the existing get_user_sites method from BingOAuthService sites = self.bing_oauth.get_user_sites(user_id) if not sites: logger.warning(f"No Bing sites found for user {user_id}") return [] # Extract site URLs from the sites data site_urls = [] for site in sites: if isinstance(site, dict) and site.get('url'): site_urls.append(site['url']) elif isinstance(site, str): site_urls.append(site) logger.info(f"Found {len(site_urls)} Bing sites for user {user_id}: {site_urls}") return site_urls except Exception as e: logger.error(f"Error getting Bing sites for user {user_id}: {e}") return [] def _calculate_health_score(self, summary: Dict[str, Any], platform_status: Dict[str, Any]) -> Dict[str, Any]: """Calculate overall SEO health score.""" try: score = 0 max_score = 100 # Base score for connected platforms if platform_status.get("gsc", {}).get("connected"): score += 30 if platform_status.get("bing", {}).get("connected"): score += 20 # Traffic score (0-30) clicks = summary.get("clicks", 0) if clicks > 1000: score += 30 elif clicks > 500: score += 20 elif clicks > 100: score += 10 # CTR score (0-20) ctr = summary.get("ctr", 0) if ctr > 0.05: # 5% score += 20 elif ctr > 0.03: # 3% score += 15 elif ctr > 0.01: # 1% score += 10 # Determine trend and color if score >= 80: trend = "up" label = "EXCELLENT" color = "#4CAF50" elif score >= 60: trend = "stable" label = "GOOD" color = "#2196F3" elif score >= 40: trend = "down" label = "NEEDS IMPROVEMENT" color = "#FF9800" else: trend = "down" label = "POOR" color = "#F44336" return { "score": score, "change": 0, # Would need historical data to calculate "trend": trend, "label": label, "color": color } except Exception as e: logger.error(f"Error calculating health score: {e}") return { "score": 0, "change": 0, "trend": "unknown", "label": "UNKNOWN", "color": "#9E9E9E" } async def _generate_ai_insights(self, summary: Dict[str, Any], timeseries: List[Dict[str, Any]], competitor_insights: Dict[str, Any]) -> List[Dict[str, Any]]: """Generate AI insights from analytics data.""" try: insights = [] # Traffic insights clicks = summary.get("clicks", 0) ctr = summary.get("ctr", 0) if clicks > 0 and ctr < 0.02: # Low CTR insights.append({ "type": "opportunity", "priority": "high", "text": f"Your CTR is {ctr:.1%}, which is below average. Consider optimizing your meta descriptions and titles.", "category": "performance" }) # Competitive insights opportunity_score = competitor_insights.get("opportunity_score", 0) if opportunity_score > 70: insights.append({ "type": "opportunity", "priority": "high", "text": f"High opportunity score of {opportunity_score}% - competitors are ranking for keywords you're not targeting.", "category": "competitive" }) # Content gaps content_gaps = competitor_insights.get("content_gaps", []) if content_gaps: insights.append({ "type": "action", "priority": "medium", "text": f"Found {len(content_gaps)} content gaps. Consider creating content for these topics.", "category": "content" }) return insights except Exception as e: logger.error(f"Error generating AI insights: {e}") return []