""" Google Search Console Analytics Handler Handles GSC analytics data retrieval and processing. """ from typing import Dict, Any from datetime import datetime, timedelta from loguru import logger from services.gsc_service import GSCService from ...analytics_cache_service import analytics_cache from ..models.analytics_data import AnalyticsData from ..models.platform_types import PlatformType from .base_handler import BaseAnalyticsHandler class GSCAnalyticsHandler(BaseAnalyticsHandler): """Handler for Google Search Console analytics""" def __init__(self): super().__init__(PlatformType.GSC) self.gsc_service = GSCService() async def get_analytics(self, user_id: str, target_url: str = None, start_date: str = None, end_date: str = None, **kwargs) -> AnalyticsData: """ Get Google Search Console analytics data with caching Args: user_id: User ID to get analytics for target_url: Optional URL to prefer when selecting GSC site Returns comprehensive SEO metrics including clicks, impressions, CTR, and position data. """ self.log_analytics_request(user_id, "get_analytics") # Check cache first - GSC API calls can be expensive # Include target_url and date range in cache key if provided cache_key_parts = [user_id] if target_url: cache_key_parts.append(str(target_url)) if start_date: cache_key_parts.append(str(start_date)) if end_date: cache_key_parts.append(str(end_date)) # Bump cache version to include page insights (v2) cache_key = "_".join(cache_key_parts + ['v2pages']) cached_data = analytics_cache.get('gsc_analytics', cache_key) if cached_data: logger.info("Using cached GSC analytics for user {user_id}", user_id=user_id) return AnalyticsData(**cached_data) logger.info("Fetching fresh GSC analytics for user {user_id}", user_id=user_id) try: # Get user's sites try: sites = self.gsc_service.get_site_list(user_id) except Exception as e: logger.warning(f"GSC site list fetch failed for user {user_id}: {e}") sites = [] # logger.info(f"GSC Sites found for user {user_id}: {sites}") if not sites: # logger.warning(f"No GSC sites found for user {user_id}") # Return standard empty response instead of error to avoid logs noise return self.create_success_response( metrics={"clicks": 0, "impressions": 0, "ctr": 0, "position": 0}, date_range={'start': start_date, 'end': end_date} ) # Select site: Prefer target_url match, otherwise first site selected_site = sites[0] if target_url: logger.info(f"Attempting to match target URL: {target_url}") # Normalize target URL (remove protocol, trailing slash) normalized_target = target_url.replace('https://', '').replace('http://', '').rstrip('/') for site in sites: site_url = site['siteUrl'] normalized_site = site_url.replace('https://', '').replace('http://', '').rstrip('/') if normalized_target in normalized_site or normalized_site in normalized_target: selected_site = site logger.info(f"Found matching GSC site: {site_url}") break site_url = selected_site['siteUrl'] logger.info(f"Using GSC site URL: {site_url}") # Determine date range (defaults to last 30 days) if not end_date: end_date = datetime.now().strftime('%Y-%m-%d') if not start_date: start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d') logger.info(f"GSC Date range: {start_date} to {end_date}") search_analytics = self.gsc_service.get_search_analytics( user_id=user_id, site_url=site_url, start_date=start_date, end_date=end_date ) logger.info(f"GSC Search analytics retrieved for user {user_id}") # Process GSC data into standardized format processed_metrics = self._process_gsc_metrics(search_analytics) result = self.create_success_response(metrics=processed_metrics, date_range={'start': start_date, 'end': end_date}) # Cache the result to avoid expensive API calls analytics_cache.set('gsc_analytics', cache_key, result.__dict__) logger.info("Cached GSC analytics data for user {user_id}", user_id=user_id) return result except Exception as e: self.log_analytics_error(user_id, "get_analytics", e) error_result = self.create_error_response(str(e)) # Cache error result briefly to avoid repeated failures but allow quick recovery analytics_cache.set('gsc_analytics', cache_key, error_result.__dict__, ttl_override=30) # 30 seconds return error_result def get_connection_status(self, user_id: str) -> Dict[str, Any]: """Get GSC connection status""" self.log_analytics_request(user_id, "get_connection_status") try: sites = self.gsc_service.get_site_list(user_id) return { 'connected': len(sites) > 0, 'sites_count': len(sites), 'sites': sites[:3] if sites else [], # Show first 3 sites 'error': None } except Exception as e: # self.log_analytics_error(user_id, "get_connection_status", e) return { 'connected': False, 'sites_count': 0, 'sites': [], 'error': str(e) } def _process_gsc_metrics(self, search_analytics: Dict[str, Any]) -> Dict[str, Any]: """Process GSC raw data into standardized metrics""" try: # Debug: Log the raw search analytics data structure logger.info(f"GSC Raw search analytics structure: {search_analytics}") logger.info(f"GSC Raw search analytics keys: {list(search_analytics.keys())}") # Handle new data structure with overall_metrics and query_data if 'overall_metrics' in search_analytics: # New structure from updated GSC service overall_rows = search_analytics.get('overall_metrics', {}).get('rows', []) query_rows = search_analytics.get('query_data', {}).get('rows', []) # Calculate totals from overall_rows (most accurate as it includes anonymized queries) total_clicks = 0 total_impressions = 0 total_position = 0 valid_position_rows = 0 # Use overall_rows for totals if available, otherwise fallback to query_rows calc_rows = overall_rows if overall_rows else query_rows for row in calc_rows: clicks = row.get('clicks', 0) impressions = row.get('impressions', 0) position = row.get('position', 0) total_clicks += clicks total_impressions += impressions if position and position > 0: total_position += position * impressions # Weighted average # Calculate weighted average position avg_position = total_position / total_impressions if total_impressions > 0 else 0 avg_ctr = (total_clicks / total_impressions * 100) if total_impressions > 0 else 0 # Use query_rows for top queries list top_queries_source = query_rows else: # Legacy structure rows = search_analytics.get('rows', []) # ... existing legacy logic ... calc_rows = rows top_queries_source = rows total_clicks = 0 total_impressions = 0 total_position = 0 valid_position_rows = 0 for row in calc_rows: clicks = row.get('clicks', 0) impressions = row.get('impressions', 0) position = row.get('position', 0) total_clicks += clicks total_impressions += impressions if position and position > 0: # Simple average for legacy/unknown structure if we can't do weighted total_position += position valid_position_rows += 1 avg_ctr = (total_clicks / total_impressions * 100) if total_impressions > 0 else 0 avg_position = total_position / valid_position_rows if valid_position_rows > 0 else 0 # Get top performing queries top_queries = [] if top_queries_source: # Sort by clicks sorted_queries = sorted(top_queries_source, key=lambda x: x.get('clicks', 0), reverse=True)[:10] for row in sorted_queries: clicks_val = row.get('clicks', 0) or 0 impr_val = row.get('impressions', 0) or 0 raw_ctr = row.get('ctr', None) # Calculate CTR% robustly even if 'ctr' field is missing in row if raw_ctr is not None: ctr_percent = round(float(raw_ctr) * 100, 2) else: ctr_percent = round(((clicks_val / impr_val) * 100), 2) if impr_val > 0 else 0.0 top_queries.append({ 'query': self._extract_query_from_row(row), 'clicks': clicks_val, 'impressions': impr_val, 'ctr': ctr_percent, 'position': round(row.get('position', 0) or 0, 2) }) # Prepare Top Pages from page_data when available top_pages = [] try: page_rows = search_analytics.get('page_data', {}).get('rows', []) qp_rows = search_analytics.get('query_page_data', {}).get('rows', []) # Build queries-by-page map queries_by_page: Dict[str, list] = {} if qp_rows: for r in qp_rows: keys = r.get('keys', []) if not keys or len(keys) < 2: continue query_key = keys[0]['keys'][0] if isinstance(keys[0], dict) else str(keys[0]) page_key = keys[1]['keys'][0] if isinstance(keys[1], dict) else str(keys[1]) clicks_val = r.get('clicks', 0) or 0 impr_val = r.get('impressions', 0) or 0 raw_ctr = r.get('ctr', None) if raw_ctr is not None: ctr_percent = round(float(raw_ctr) * 100, 2) else: ctr_percent = round(((clicks_val / impr_val) * 100), 2) if impr_val > 0 else 0.0 lst = queries_by_page.setdefault(page_key, []) lst.append({ 'query': query_key, 'clicks': clicks_val, 'impressions': impr_val, 'ctr': ctr_percent, }) if page_rows: sorted_pages = sorted(page_rows, key=lambda x: x.get('clicks', 0), reverse=True)[:10] for row in sorted_pages: clicks_val = row.get('clicks', 0) or 0 impr_val = row.get('impressions', 0) or 0 raw_ctr = row.get('ctr', None) if raw_ctr is not None: ctr_percent = round(float(raw_ctr) * 100, 2) else: ctr_percent = round(((clicks_val / impr_val) * 100), 2) if impr_val > 0 else 0.0 page_url = self._extract_page_from_row(row) # attach top queries pointing to this page, sorted by clicks page_queries = sorted(queries_by_page.get(page_url, []), key=lambda x: x.get('clicks', 0), reverse=True)[:5] top_pages.append({ 'page': page_url, 'clicks': clicks_val, 'impressions': impr_val, 'ctr': ctr_percent, 'position': round(row.get('position', 0) or 0, 2) if 'position' in row else None, 'queries': page_queries }) except Exception as e: logger.warning(f"Failed processing top_pages: {e}") # Detect Cannibalization (query mapping to multiple pages) cannibalization = [] try: qp_rows = search_analytics.get('query_page_data', {}).get('rows', []) q_rows = search_analytics.get('query_data', {}).get('rows', []) if qp_rows: # Determine window days for thresholding from datetime import datetime start_s = search_analytics.get('startDate') end_s = search_analytics.get('endDate') window_days = 30 try: if start_s and end_s: sd = datetime.strptime(start_s, "%Y-%m-%d") ed = datetime.strptime(end_s, "%Y-%m-%d") window_days = max((ed - sd).days + 1, 1) except Exception: pass min_clicks = 10 if window_days <= 7 else (30 if window_days <= 30 else 60) # Build map: query -> { page -> metrics } by_query: Dict[str, Dict[str, Dict[str, float]]] = {} for r in qp_rows: keys = r.get('keys', []) if not keys or len(keys) < 2: continue qk = keys[0]['keys'][0] if isinstance(keys[0], dict) else str(keys[0]) pk = keys[1]['keys'][0] if isinstance(keys[1], dict) else str(keys[1]) clicks_val = float(r.get('clicks', 0) or 0) impr_val = float(r.get('impressions', 0) or 0) raw_ctr = r.get('ctr', None) if raw_ctr is not None: ctr_percent = float(raw_ctr) * 100.0 else: ctr_percent = (clicks_val / impr_val * 100.0) if impr_val > 0 else 0.0 pos_val = float(r.get('position', 0) or 0) by_query.setdefault(qk, {}).setdefault(pk, {"clicks": 0.0, "impressions": 0.0, "ctr": 0.0, "position_sum": 0.0, "position_count": 0.0}) agg = by_query[qk][pk] agg["clicks"] += clicks_val agg["impressions"] += impr_val agg["ctr"] = max(agg["ctr"], ctr_percent) if pos_val > 0: agg["position_sum"] += pos_val agg["position_count"] += 1 # Use query totals for context total_by_query: Dict[str, Dict[str, float]] = {} for r in q_rows or []: qk = self._extract_query_from_row(r) total_by_query[qk] = { "clicks": float(r.get('clicks', 0) or 0), "impressions": float(r.get('impressions', 0) or 0), "position": float(r.get('position', 0) or 0) } for qk, pages_map in by_query.items(): if len(pages_map) < 2: continue total_clicks = sum(p["clicks"] for p in pages_map.values()) if total_clicks < min_clicks: continue qpos = total_by_query.get(qk, {}).get("position", 0.0) if not (3.0 <= qpos <= 20.0) and qpos != 0.0: # Skip queries already ranking very well or very poorly (if pos present) continue pages_list = [] for pk, m in pages_map.items(): avg_pos = (m["position_sum"] / m["position_count"]) if m["position_count"] > 0 else 0.0 pages_list.append({ "page": pk, "clicks": round(m["clicks"], 0), "impressions": round(m["impressions"], 0), "ctr": round(m["ctr"], 2), "position": round(avg_pos, 2) if avg_pos > 0 else None }) pages_list.sort(key=lambda x: x.get("clicks", 0), reverse=True) target_page = pages_list[0]["page"] if pages_list else None cannibalization.append({ "query": qk, "total_clicks": int(round(total_clicks)), "recommended_target_page": target_page, "pages": pages_list[:3] }) # Sort by impact cannibalization.sort(key=lambda item: item.get("total_clicks", 0), reverse=True) cannibalization = cannibalization[:10] except Exception as e: logger.warning(f"Failed computing cannibalization: {e}") return { 'connection_status': 'connected', 'connected_sites': 1, 'total_clicks': total_clicks, 'total_impressions': total_impressions, 'avg_ctr': round(avg_ctr, 2), 'avg_position': round(avg_position, 2), 'total_queries': len(top_queries_source) if top_queries_source else 0, 'top_queries': top_queries, 'top_pages': top_pages, 'cannibalization': cannibalization } except Exception as e: logger.error(f"Error processing GSC metrics: {e}") return { 'connection_status': 'error', 'connected_sites': 0, 'total_clicks': 0, 'total_impressions': 0, 'avg_ctr': 0, 'avg_position': 0, 'total_queries': 0, 'top_queries': [], 'top_pages': [], 'error': str(e) } def _extract_query_from_row(self, row: Dict[str, Any]) -> str: """Extract query text from GSC API row data""" try: keys = row.get('keys', []) if keys and len(keys) > 0: first_key = keys[0] if isinstance(first_key, dict): return first_key.get('keys', ['Unknown'])[0] else: return str(first_key) return 'Unknown' except Exception as e: logger.error(f"Error extracting query from row: {e}") return 'Unknown' def _extract_page_from_row(self, row: Dict[str, Any]) -> str: """Extract page URL from GSC API row data""" try: keys = row.get('keys', []) if keys and len(keys) > 0: first_key = keys[0] if isinstance(first_key, dict): return first_key.get('keys', [''])[0] else: return str(first_key) return '' except Exception as e: logger.error(f"Error extracting page from row: {e}") return ''