1506 lines
58 KiB
Python
1506 lines
58 KiB
Python
"""SEO Dashboard API endpoints for ALwrity."""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, status
|
|
from pydantic import BaseModel, Field
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
import os
|
|
from loguru import logger
|
|
import time
|
|
|
|
# Import existing services
|
|
from services.onboarding.api_key_manager import APIKeyManager
|
|
from services.validation import check_all_api_keys
|
|
from services.seo_analyzer import ComprehensiveSEOAnalyzer, SEOAnalysisResult, SEOAnalysisService
|
|
from services.user_data_service import UserDataService
|
|
from services.database import get_db_session, get_session_for_user
|
|
from services.seo import SEODashboardService
|
|
from middleware.auth_middleware import get_current_user
|
|
from services.llm_providers.main_text_generation import llm_text_gen
|
|
from api.content_planning.services.content_strategy.onboarding import OnboardingDataIntegrationService
|
|
from models.onboarding import SEOPageAudit, WebsiteAnalysis, OnboardingSession
|
|
from sqlalchemy.orm.attributes import flag_modified
|
|
|
|
from sqlalchemy import desc
|
|
|
|
# Phase 2B: Import semantic monitoring
|
|
from services.intelligence.monitoring.semantic_dashboard import RealTimeSemanticMonitor, SemanticHealthMetric
|
|
|
|
# GSC services for keyword gap analysis
|
|
from services.gsc_service import GSCService
|
|
from services.gsc_brainstorm_service import GSCBrainstormService
|
|
|
|
# Import SIF models for guardian audit
|
|
from models.website_analysis_monitoring_models import SIFIndexingTask, SIFIndexingExecutionLog
|
|
|
|
router = APIRouter(prefix="/api/seo-dashboard", tags=["SEO Dashboard"])
|
|
|
|
# Initialize the SEO analyzer
|
|
seo_analyzer = ComprehensiveSEOAnalyzer()
|
|
|
|
# Pydantic models for SEO Dashboard
|
|
class SEOHealthScore(BaseModel):
|
|
score: int
|
|
change: int
|
|
trend: str
|
|
label: str
|
|
color: str
|
|
|
|
class SEOMetric(BaseModel):
|
|
value: float
|
|
change: float
|
|
trend: str
|
|
description: str
|
|
color: str
|
|
|
|
class PlatformStatus(BaseModel):
|
|
status: str
|
|
connected: bool
|
|
last_sync: Optional[str] = None
|
|
data_points: Optional[int] = None
|
|
|
|
class AIInsight(BaseModel):
|
|
insight: str
|
|
priority: str
|
|
category: str
|
|
action_required: bool
|
|
tool_path: Optional[str] = None
|
|
|
|
class SEODashboardData(BaseModel):
|
|
health_score: SEOHealthScore
|
|
key_insight: str
|
|
priority_alert: str
|
|
metrics: Dict[str, SEOMetric]
|
|
platforms: Dict[str, PlatformStatus]
|
|
ai_insights: List[AIInsight]
|
|
last_updated: str
|
|
website_url: Optional[str] = None # User's website URL from onboarding
|
|
|
|
# New models for comprehensive SEO analysis
|
|
class SEOAnalysisRequest(BaseModel):
|
|
url: str
|
|
target_keywords: Optional[List[str]] = None
|
|
|
|
class AnalyzeURLsRequest(BaseModel):
|
|
urls: List[str]
|
|
|
|
class SEOAnalysisResponse(BaseModel):
|
|
url: str
|
|
timestamp: datetime
|
|
overall_score: int
|
|
health_status: str
|
|
critical_issues: List[Dict[str, Any]]
|
|
warnings: List[Dict[str, Any]]
|
|
recommendations: List[Dict[str, Any]]
|
|
data: Dict[str, Any]
|
|
success: bool
|
|
message: str
|
|
|
|
class SEOMetricsResponse(BaseModel):
|
|
metrics: Dict[str, Any]
|
|
critical_issues: List[Dict[str, Any]]
|
|
warnings: List[Dict[str, Any]]
|
|
recommendations: List[Dict[str, Any]]
|
|
detailed_analysis: Dict[str, Any]
|
|
timestamp: str
|
|
url: str
|
|
|
|
# Mock data for Phase 1
|
|
def get_mock_seo_data() -> SEODashboardData:
|
|
"""Get mock SEO dashboard data for Phase 1."""
|
|
# Try to get the user's website URL from the database
|
|
website_url = None
|
|
db_session = get_db_session()
|
|
if db_session:
|
|
try:
|
|
user_data_service = UserDataService(db_session)
|
|
website_url = user_data_service.get_user_website_url()
|
|
logger.info(f"Retrieved website URL from database: {website_url}")
|
|
except Exception as e:
|
|
logger.error(f"Error fetching website URL from database: {e}")
|
|
finally:
|
|
db_session.close()
|
|
|
|
return SEODashboardData(
|
|
health_score=SEOHealthScore(
|
|
score=78,
|
|
change=12,
|
|
trend="up",
|
|
label="Good",
|
|
color="#FF9800"
|
|
),
|
|
key_insight="Your content strategy is working! Focus on technical SEO to reach 90+ score",
|
|
priority_alert="Mobile speed needs attention - 2.8s load time",
|
|
website_url=website_url, # Include the user's website URL
|
|
metrics={
|
|
"traffic": SEOMetric(
|
|
value=23450,
|
|
change=23,
|
|
trend="up",
|
|
description="Strong growth!",
|
|
color="#4CAF50"
|
|
),
|
|
"rankings": SEOMetric(
|
|
value=8,
|
|
change=8,
|
|
trend="up",
|
|
description="Great work on content",
|
|
color="#2196F3"
|
|
),
|
|
"mobile": SEOMetric(
|
|
value=2.8,
|
|
change=-0.3,
|
|
trend="down",
|
|
description="Needs attention",
|
|
color="#FF9800"
|
|
),
|
|
"keywords": SEOMetric(
|
|
value=156,
|
|
change=5,
|
|
trend="up",
|
|
description="5 new opportunities",
|
|
color="#9C27B0"
|
|
)
|
|
},
|
|
platforms={
|
|
"google_search_console": PlatformStatus(
|
|
status="excellent",
|
|
connected=True,
|
|
last_sync="2024-01-15T10:30:00Z",
|
|
data_points=1250
|
|
),
|
|
"google_analytics": PlatformStatus(
|
|
status="good",
|
|
connected=True,
|
|
last_sync="2024-01-15T10:25:00Z",
|
|
data_points=890
|
|
),
|
|
"bing_webmaster": PlatformStatus(
|
|
status="needs_attention",
|
|
connected=False,
|
|
last_sync=None,
|
|
data_points=0
|
|
)
|
|
},
|
|
ai_insights=[
|
|
AIInsight(
|
|
insight="Your mobile page speed is 2.8s - optimize images and enable compression",
|
|
priority="high",
|
|
category="performance",
|
|
action_required=True,
|
|
tool_path="/seo-tools/page-speed-optimizer"
|
|
),
|
|
AIInsight(
|
|
insight="Add structured data to improve rich snippet opportunities",
|
|
priority="medium",
|
|
category="technical",
|
|
action_required=False,
|
|
tool_path="/seo-tools/schema-generator"
|
|
),
|
|
AIInsight(
|
|
insight="Content quality score improved by 15% - great work!",
|
|
priority="low",
|
|
category="content",
|
|
action_required=False
|
|
)
|
|
],
|
|
last_updated="2024-01-15T10:30:00Z"
|
|
)
|
|
|
|
def calculate_health_score(metrics: Dict[str, Any]) -> SEOHealthScore:
|
|
"""Calculate SEO health score based on metrics."""
|
|
# This would be replaced with actual calculation logic
|
|
base_score = 75
|
|
change = 12
|
|
trend = "up"
|
|
label = "Good"
|
|
color = "#FF9800"
|
|
|
|
return SEOHealthScore(
|
|
score=base_score,
|
|
change=change,
|
|
trend=trend,
|
|
label=label,
|
|
color=color
|
|
)
|
|
|
|
def generate_ai_insights(metrics: Dict[str, Any], platforms: Dict[str, Any]) -> List[AIInsight]:
|
|
"""Generate AI-powered insights based on metrics and platform data."""
|
|
insights = []
|
|
|
|
# Performance insights
|
|
if metrics.get("mobile", {}).get("value", 0) > 2.5:
|
|
insights.append(AIInsight(
|
|
insight="Mobile page speed needs optimization - aim for under 2 seconds",
|
|
priority="high",
|
|
category="performance",
|
|
action_required=True,
|
|
tool_path="/seo-tools/page-speed-optimizer"
|
|
))
|
|
|
|
# Technical insights
|
|
if not platforms.get("google_search_console", {}).get("connected", False):
|
|
insights.append(AIInsight(
|
|
insight="Connect Google Search Console for better SEO monitoring",
|
|
priority="medium",
|
|
category="technical",
|
|
action_required=True,
|
|
tool_path="/seo-tools/search-console-setup"
|
|
))
|
|
|
|
# Content insights
|
|
if metrics.get("rankings", {}).get("change", 0) > 0:
|
|
insights.append(AIInsight(
|
|
insight="Rankings are improving - continue with current content strategy",
|
|
priority="low",
|
|
category="content",
|
|
action_required=False
|
|
))
|
|
|
|
return insights
|
|
|
|
from services.seo.deep_competitor_analysis_service import DeepCompetitorAnalysisService
|
|
|
|
# API Endpoints
|
|
async def run_strategic_insights(
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Manually trigger AI-Powered Competitive Insights (Weekly Strategy Brief).
|
|
"""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session(user_id)
|
|
|
|
if not db_session:
|
|
raise HTTPException(status_code=500, detail="Database connection unavailable")
|
|
|
|
try:
|
|
# 1. Get Website Analysis (with fallback)
|
|
website_analysis_data = None
|
|
analysis_id = None
|
|
|
|
# Try SSOT first
|
|
integration_service = OnboardingDataIntegrationService()
|
|
integrated_data = integration_service.get_integrated_data_sync(user_id, db_session)
|
|
if integrated_data and integrated_data.get("website_analysis"):
|
|
website_analysis_data = integrated_data.get("website_analysis")
|
|
analysis_id = website_analysis_data.get("id")
|
|
|
|
# Fallback: Find latest WebsiteAnalysis across sessions
|
|
if not website_analysis_data:
|
|
latest_analysis = db_session.query(WebsiteAnalysis).join(
|
|
OnboardingSession, WebsiteAnalysis.session_id == OnboardingSession.id
|
|
).filter(
|
|
OnboardingSession.user_id == user_id
|
|
).order_by(WebsiteAnalysis.updated_at.desc()).first()
|
|
|
|
if latest_analysis:
|
|
# Convert to dict
|
|
from fastapi.encoders import jsonable_encoder
|
|
website_analysis_data = jsonable_encoder(latest_analysis)
|
|
analysis_id = latest_analysis.id
|
|
|
|
if not website_analysis_data:
|
|
raise HTTPException(status_code=400, detail="No website analysis found. Please complete Onboarding Step 2.")
|
|
|
|
# 2. Get Competitors
|
|
competitors = []
|
|
if integrated_data:
|
|
competitors = integrated_data.get("competitor_analysis", [])
|
|
|
|
if not competitors:
|
|
# Fallback to research preferences
|
|
research_prefs = integrated_data.get("research_preferences", {})
|
|
competitors = research_prefs.get("competitors", [])
|
|
|
|
if not competitors:
|
|
raise HTTPException(status_code=400, detail="No competitors found. Please complete Onboarding Step 3.")
|
|
|
|
# 3. Run Analysis
|
|
service = DeepCompetitorAnalysisService()
|
|
report = await service.generate_weekly_strategy_brief(
|
|
user_id=user_id,
|
|
website_analysis=website_analysis_data,
|
|
competitors=competitors
|
|
)
|
|
|
|
# 4. Persist to History
|
|
if analysis_id:
|
|
wa = db_session.query(WebsiteAnalysis).filter(WebsiteAnalysis.id == analysis_id).first()
|
|
if wa:
|
|
history = wa.strategic_insights_history or []
|
|
# Ensure history is a list
|
|
if not isinstance(history, list):
|
|
history = []
|
|
|
|
# Prepend new report
|
|
history.insert(0, report)
|
|
|
|
# Keep last 52 weeks
|
|
wa.strategic_insights_history = history[:52]
|
|
flag_modified(wa, "strategic_insights_history")
|
|
db_session.commit()
|
|
|
|
return report
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except HTTPException as he:
|
|
raise he
|
|
except Exception as e:
|
|
logger.error(f"Error running strategic insights: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to run analysis: {str(e)}")
|
|
|
|
async def get_seo_dashboard_data(current_user: dict = Depends(get_current_user)) -> SEODashboardData:
|
|
"""Get comprehensive SEO dashboard data."""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session(user_id)
|
|
|
|
if not db_session:
|
|
logger.error("No database session available")
|
|
return get_mock_seo_data()
|
|
|
|
try:
|
|
# Use new SEO dashboard service
|
|
dashboard_service = SEODashboardService(db_session)
|
|
overview_data = await dashboard_service.get_dashboard_overview(user_id)
|
|
|
|
# Convert to SEODashboardData format
|
|
return SEODashboardData(
|
|
health_score=SEOHealthScore(**overview_data.get("health_score", {})),
|
|
key_insight=overview_data.get("key_insight", "Connect your analytics accounts for personalized insights"),
|
|
priority_alert=overview_data.get("priority_alert", "No alerts at this time"),
|
|
metrics=_convert_metrics(overview_data.get("summary", {})),
|
|
platforms=_convert_platforms(overview_data.get("platforms", {})),
|
|
ai_insights=[AIInsight(**insight) for insight in overview_data.get("ai_insights", [])],
|
|
last_updated=overview_data.get("last_updated", datetime.now().isoformat()),
|
|
website_url=overview_data.get("website_url")
|
|
)
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting SEO dashboard data: {e}")
|
|
# Fallback to mock data
|
|
return get_mock_seo_data()
|
|
|
|
async def get_seo_health_score(current_user: dict = Depends(get_current_user)) -> SEOHealthScore:
|
|
"""Get current SEO health score."""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session(user_id)
|
|
|
|
if not db_session:
|
|
raise HTTPException(status_code=500, detail="Database connection unavailable")
|
|
|
|
try:
|
|
dashboard_service = SEODashboardService(db_session)
|
|
overview_data = await dashboard_service.get_dashboard_overview(user_id)
|
|
health_score_data = overview_data.get("health_score", {})
|
|
return SEOHealthScore(**health_score_data)
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting SEO health score: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get SEO health score")
|
|
|
|
async def get_seo_metrics(current_user: dict = Depends(get_current_user)) -> Dict[str, SEOMetric]:
|
|
"""Get SEO metrics."""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session(user_id)
|
|
|
|
if not db_session:
|
|
raise HTTPException(status_code=500, detail="Database connection unavailable")
|
|
|
|
try:
|
|
dashboard_service = SEODashboardService(db_session)
|
|
overview_data = await dashboard_service.get_dashboard_overview(user_id)
|
|
summary_data = overview_data.get("summary", {})
|
|
return _convert_metrics(summary_data)
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting SEO metrics: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get SEO metrics")
|
|
|
|
async def get_platform_status(
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Dict[str, Any]:
|
|
"""Get platform connection status."""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session(user_id)
|
|
|
|
if not db_session:
|
|
logger.error("No database session available")
|
|
raise HTTPException(status_code=500, detail="Database connection failed")
|
|
|
|
try:
|
|
# Use SEO dashboard service to get platform status
|
|
dashboard_service = SEODashboardService(db_session)
|
|
platform_status = await dashboard_service.get_platform_status(user_id)
|
|
|
|
logger.info(f"Retrieved platform status for user {user_id}")
|
|
return platform_status
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting platform status: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get platform status")
|
|
|
|
async def get_ai_insights(current_user: dict = Depends(get_current_user)) -> List[AIInsight]:
|
|
"""Get AI-generated insights."""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session(user_id)
|
|
|
|
if not db_session:
|
|
raise HTTPException(status_code=500, detail="Database connection unavailable")
|
|
|
|
try:
|
|
dashboard_service = SEODashboardService(db_session)
|
|
overview_data = await dashboard_service.get_dashboard_overview(user_id)
|
|
ai_insights_data = overview_data.get("ai_insights", [])
|
|
return [AIInsight(**insight) for insight in ai_insights_data]
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting AI insights: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get AI insights")
|
|
|
|
async def seo_dashboard_health_check():
|
|
"""Health check for SEO dashboard."""
|
|
return {"status": "healthy", "service": "SEO Dashboard API"}
|
|
|
|
# Phase 2B: Semantic health monitoring endpoint
|
|
async def get_semantic_health(current_user: dict = Depends(get_current_user)) -> SemanticHealthMetric:
|
|
"""
|
|
Get the canonical semantic health summary for the user's content and competitors.
|
|
This endpoint provides Phase 2B semantic intelligence monitoring data.
|
|
|
|
Returns:
|
|
SemanticHealthMetric with health status, aggregate score, and recommendations.
|
|
"""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
|
|
# Initialize semantic monitor for this user
|
|
semantic_monitor = RealTimeSemanticMonitor(user_id)
|
|
|
|
# Get current semantic health (will use cache if available)
|
|
semantic_health: SemanticHealthMetric = await semantic_monitor.check_semantic_health(user_id)
|
|
|
|
logger.info(f"[Semantic Health API] Retrieved health data for user {user_id}: {semantic_health.status} (score: {semantic_health.value:.2f})")
|
|
|
|
return semantic_health
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Semantic Health API] Error retrieving semantic health for user: {e}")
|
|
# Return a default healthy state with warning message
|
|
return SemanticHealthMetric(
|
|
metric_name="semantic_health",
|
|
value=0.5,
|
|
threshold=0.6,
|
|
status="warning",
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
description="Semantic monitoring temporarily unavailable",
|
|
recommendations=["Please try again later", "Check system status"]
|
|
)
|
|
|
|
|
|
async def get_semantic_cache_stats(current_user: dict = Depends(get_current_user)) -> Dict[str, Any]:
|
|
"""
|
|
Get statistics for the semantic cache.
|
|
"""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
# Initialize semantic monitor to access its cache manager
|
|
semantic_monitor = RealTimeSemanticMonitor(user_id)
|
|
return await semantic_monitor.get_cache_stats()
|
|
except Exception as e:
|
|
logger.error(f"[Semantic Cache API] Error retrieving cache stats: {e}")
|
|
return {
|
|
"error": "Failed to retrieve cache statistics",
|
|
"hit_rate": 0.0,
|
|
"memory_usage_mb": 0.0
|
|
}
|
|
|
|
|
|
async def get_sif_indexing_health(current_user: dict = Depends(get_current_user)) -> Dict[str, Any]:
|
|
try:
|
|
user_id = str(current_user.get("id"))
|
|
db_session = get_session_for_user(user_id)
|
|
if not db_session:
|
|
raise HTTPException(status_code=500, detail="Database connection unavailable")
|
|
|
|
try:
|
|
dashboard_service = SEODashboardService(db_session)
|
|
onboarding_task_health = await dashboard_service.get_onboarding_scheduled_task_health(user_id)
|
|
sif_health = onboarding_task_health.get("tasks", {}).get("SIFIndexingTask", {})
|
|
|
|
if sif_health.get("status") == "not_scheduled":
|
|
return {
|
|
"has_task": False,
|
|
"status": "not_scheduled",
|
|
"message": "SIF indexing task not yet scheduled for this website.",
|
|
}
|
|
|
|
overall_status = "healthy"
|
|
if (sif_health.get("consecutive_failures") or 0) > 0:
|
|
overall_status = "warning"
|
|
if sif_health.get("status") in {"failed", "needs_intervention"}:
|
|
overall_status = "critical"
|
|
|
|
return {
|
|
"has_task": True,
|
|
"status": overall_status,
|
|
"task": {
|
|
"raw_status": sif_health.get("status"),
|
|
"next_execution": sif_health.get("next_execution"),
|
|
"last_success": sif_health.get("last_success"),
|
|
"last_failure": sif_health.get("last_failure"),
|
|
"consecutive_failures": sif_health.get("consecutive_failures") or 0,
|
|
},
|
|
"last_run": {
|
|
"status": sif_health.get("latest_execution", {}).get("status"),
|
|
"time": sif_health.get("latest_execution", {}).get("execution_date"),
|
|
"error_message": sif_health.get("latest_execution", {}).get("error_message"),
|
|
},
|
|
}
|
|
finally:
|
|
db_session.close()
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Failed to get SIF indexing health: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get SIF indexing health")
|
|
|
|
|
|
async def get_guardian_audit(current_user: dict = Depends(get_current_user)) -> Dict[str, Any]:
|
|
"""
|
|
Get the latest Content Guardian audit report for the current user.
|
|
Returns audit data (quality, brand voice, safety, cannibalization) or a
|
|
null-state response if no audit has been performed yet.
|
|
"""
|
|
try:
|
|
user_id = str(current_user.get("id"))
|
|
db_session = get_session_for_user(user_id)
|
|
if not db_session:
|
|
raise HTTPException(status_code=500, detail="Database connection unavailable")
|
|
|
|
try:
|
|
# Find the most recent SIF indexing task for this user
|
|
task = (
|
|
db_session.query(SIFIndexingTask)
|
|
.filter(SIFIndexingTask.user_id == user_id)
|
|
.order_by(desc(SIFIndexingTask.created_at))
|
|
.first()
|
|
)
|
|
|
|
if not task:
|
|
return {
|
|
"has_audit": False,
|
|
"status": "not_available",
|
|
"message": "No SIF indexing task found. Onboarding may not be complete.",
|
|
}
|
|
|
|
# Get the latest execution log with a guardian report
|
|
log = (
|
|
db_session.query(SIFIndexingExecutionLog)
|
|
.filter(
|
|
SIFIndexingExecutionLog.task_id == task.id,
|
|
SIFIndexingExecutionLog.result_data.isnot(None),
|
|
)
|
|
.order_by(desc(SIFIndexingExecutionLog.execution_date))
|
|
.first()
|
|
)
|
|
|
|
if not log or not log.result_data:
|
|
return {
|
|
"has_audit": False,
|
|
"status": "pending",
|
|
"message": "SIF indexing has not completed a run yet.",
|
|
}
|
|
|
|
guardian_report = log.result_data.get("guardian_report")
|
|
if not guardian_report:
|
|
return {
|
|
"has_audit": False,
|
|
"status": "no_report",
|
|
"message": "Guardian audit was not performed on the last indexing run.",
|
|
}
|
|
|
|
return {
|
|
"has_audit": True,
|
|
"status": "available",
|
|
"audit_timestamp": guardian_report.get("audit_timestamp"),
|
|
"website_url": guardian_report.get("website_url"),
|
|
"total_pages_crawled": guardian_report.get("total_pages_crawled", 0),
|
|
"content_quality": guardian_report.get("content_quality"),
|
|
"brand_voice_consistency": guardian_report.get("brand_voice_consistency"),
|
|
"safety_issues": guardian_report.get("safety_issues"),
|
|
"cannibalization_issues": guardian_report.get("cannibalization_issues"),
|
|
"last_execution_time": log.execution_date.isoformat() if log.execution_date else None,
|
|
}
|
|
finally:
|
|
db_session.close()
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Failed to get guardian audit: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get guardian audit")
|
|
|
|
|
|
async def get_keyword_gaps(
|
|
current_user: dict = Depends(get_current_user),
|
|
site_url: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get keyword gap analysis from GSC data.
|
|
Returns keyword gaps, quick wins, content opportunities, and page-level opportunities
|
|
derived from the user's Google Search Console search analytics (last 30 days).
|
|
"""
|
|
try:
|
|
user_id = str(current_user.get("id"))
|
|
|
|
gsc_service = GSCService()
|
|
brainstorm_service = GSCBrainstormService(gsc_service)
|
|
|
|
# Resolve site URL
|
|
if not site_url:
|
|
sites = gsc_service.get_site_list(user_id)
|
|
if not sites:
|
|
return {
|
|
"error": "No GSC sites found. Connect Google Search Console first.",
|
|
"keyword_gaps": [],
|
|
"quick_wins": [],
|
|
"content_opportunities": [],
|
|
"page_opportunities": [],
|
|
"summary": {},
|
|
}
|
|
site_url = sites[0].get("siteUrl", "")
|
|
|
|
# Fetch GSC analytics (last 30 days)
|
|
end_date = datetime.now().strftime("%Y-%m-%d")
|
|
start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
|
|
|
|
analytics = gsc_service.get_search_analytics(
|
|
user_id=user_id,
|
|
site_url=site_url,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
)
|
|
|
|
if "error" in analytics:
|
|
return {
|
|
"error": analytics.get("error", "Failed to fetch GSC data"),
|
|
"keyword_gaps": [],
|
|
"quick_wins": [],
|
|
"content_opportunities": [],
|
|
"page_opportunities": [],
|
|
"summary": {},
|
|
}
|
|
|
|
query_rows = analytics.get("query_data", {}).get("rows", [])
|
|
page_rows = analytics.get("page_data", {}).get("rows", [])
|
|
|
|
keywords_data = GSCBrainstormService._parse_query_rows(query_rows)
|
|
pages_data = GSCBrainstormService._parse_page_rows(page_rows)
|
|
|
|
if not keywords_data:
|
|
return {
|
|
"error": "No keyword data available for the last 30 days.",
|
|
"keyword_gaps": [],
|
|
"quick_wins": [],
|
|
"content_opportunities": [],
|
|
"page_opportunities": [],
|
|
"summary": {
|
|
"site_url": site_url,
|
|
"date_range": {"start": start_date, "end": end_date},
|
|
"total_keywords_analyzed": 0,
|
|
},
|
|
}
|
|
|
|
# Run rule-based analysis WITHOUT topic filter (site-wide)
|
|
content_opportunities = GSCBrainstormService._identify_content_opportunities(keywords_data)
|
|
keyword_gaps = GSCBrainstormService._identify_keyword_gaps(keywords_data)
|
|
quick_wins = GSCBrainstormService._identify_quick_wins(keywords_data)
|
|
page_opportunities = GSCBrainstormService._identify_page_opportunities(pages_data)
|
|
summary = GSCBrainstormService._compute_summary(
|
|
keywords_data, pages_data, site_url, start_date, end_date
|
|
)
|
|
|
|
return {
|
|
"keyword_gaps": keyword_gaps,
|
|
"quick_wins": quick_wins,
|
|
"content_opportunities": content_opportunities,
|
|
"page_opportunities": page_opportunities,
|
|
"summary": summary,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to get keyword gaps: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to get keyword gaps: {str(e)}")
|
|
|
|
|
|
async def get_onboarding_task_health(
|
|
current_user: dict = Depends(get_current_user),
|
|
site_url: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Get consolidated onboarding scheduled SEO task health."""
|
|
try:
|
|
user_id = str(current_user.get("id"))
|
|
db_session = get_session_for_user(user_id)
|
|
if not db_session:
|
|
raise HTTPException(status_code=500, detail="Database connection unavailable")
|
|
|
|
try:
|
|
dashboard_service = SEODashboardService(db_session)
|
|
return await dashboard_service.get_onboarding_scheduled_task_health(user_id, site_url)
|
|
finally:
|
|
db_session.close()
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Failed to get onboarding task health: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get onboarding scheduled task health")
|
|
|
|
# New comprehensive SEO analysis endpoints
|
|
async def analyze_seo_comprehensive(request: SEOAnalysisRequest) -> SEOAnalysisResponse:
|
|
"""
|
|
Analyze a URL for comprehensive SEO performance (progressive mode)
|
|
|
|
Args:
|
|
request: SEOAnalysisRequest containing URL and optional target keywords
|
|
|
|
Returns:
|
|
SEOAnalysisResponse with detailed analysis results
|
|
"""
|
|
try:
|
|
logger.info(f"Starting progressive SEO analysis for URL: {request.url}")
|
|
|
|
# Use progressive analysis for comprehensive results with timeout handling
|
|
result = seo_analyzer.analyze_url_progressive(request.url, request.target_keywords)
|
|
|
|
# Store result in database
|
|
db_session = get_db_session()
|
|
if db_session:
|
|
try:
|
|
seo_service = SEOAnalysisService(db_session)
|
|
stored_analysis = seo_service.store_analysis_result(result)
|
|
if stored_analysis:
|
|
logger.info(f"Stored progressive SEO analysis in database with ID: {stored_analysis.id}")
|
|
else:
|
|
logger.warning("Failed to store SEO analysis in database")
|
|
except Exception as db_error:
|
|
logger.error(f"Database error during analysis storage: {str(db_error)}")
|
|
finally:
|
|
db_session.close()
|
|
|
|
# Convert to response format
|
|
response_data = {
|
|
'url': result.url,
|
|
'timestamp': result.timestamp,
|
|
'overall_score': result.overall_score,
|
|
'health_status': result.health_status,
|
|
'critical_issues': result.critical_issues,
|
|
'warnings': result.warnings,
|
|
'recommendations': result.recommendations,
|
|
'data': result.data,
|
|
'success': True,
|
|
'message': f"Progressive SEO analysis completed successfully for {result.url}"
|
|
}
|
|
|
|
logger.info(f"Progressive SEO analysis completed for {request.url}. Overall score: {result.overall_score}")
|
|
return SEOAnalysisResponse(**response_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing SEO for {request.url}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error analyzing SEO: {str(e)}"
|
|
)
|
|
|
|
async def analyze_seo_full(request: SEOAnalysisRequest) -> SEOAnalysisResponse:
|
|
"""
|
|
Analyze a URL for comprehensive SEO performance (full analysis)
|
|
|
|
Args:
|
|
request: SEOAnalysisRequest containing URL and optional target keywords
|
|
|
|
Returns:
|
|
SEOAnalysisResponse with detailed analysis results
|
|
"""
|
|
try:
|
|
logger.info(f"Starting full SEO analysis for URL: {request.url}")
|
|
|
|
# Use progressive analysis for comprehensive results
|
|
result = seo_analyzer.analyze_url_progressive(request.url, request.target_keywords)
|
|
|
|
# Store result in database
|
|
db_session = get_db_session()
|
|
if db_session:
|
|
try:
|
|
seo_service = SEOAnalysisService(db_session)
|
|
stored_analysis = seo_service.store_analysis_result(result)
|
|
if stored_analysis:
|
|
logger.info(f"Stored full SEO analysis in database with ID: {stored_analysis.id}")
|
|
else:
|
|
logger.warning("Failed to store SEO analysis in database")
|
|
except Exception as db_error:
|
|
logger.error(f"Database error during analysis storage: {str(db_error)}")
|
|
finally:
|
|
db_session.close()
|
|
|
|
# Convert to response format
|
|
response_data = {
|
|
'url': result.url,
|
|
'timestamp': result.timestamp,
|
|
'overall_score': result.overall_score,
|
|
'health_status': result.health_status,
|
|
'critical_issues': result.critical_issues,
|
|
'warnings': result.warnings,
|
|
'recommendations': result.recommendations,
|
|
'data': result.data,
|
|
'success': True,
|
|
'message': f"Full SEO analysis completed successfully for {result.url}"
|
|
}
|
|
|
|
logger.info(f"Full SEO analysis completed for {request.url}. Overall score: {result.overall_score}")
|
|
return SEOAnalysisResponse(**response_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in full SEO analysis for {request.url}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error in full SEO analysis: {str(e)}"
|
|
)
|
|
|
|
async def get_seo_metrics_detailed(url: str) -> SEOMetricsResponse:
|
|
"""
|
|
Get detailed SEO metrics for dashboard display
|
|
|
|
Args:
|
|
url: The URL to analyze
|
|
|
|
Returns:
|
|
Detailed SEO metrics for React dashboard
|
|
"""
|
|
try:
|
|
# Ensure URL has protocol
|
|
if not url.startswith(('http://', 'https://')):
|
|
url = f"https://{url}"
|
|
|
|
logger.info(f"Getting detailed SEO metrics for URL: {url}")
|
|
|
|
# Perform analysis
|
|
result = seo_analyzer.analyze_url_progressive(url)
|
|
|
|
# Extract metrics for dashboard
|
|
metrics = {
|
|
"overall_score": result.overall_score,
|
|
"health_status": result.health_status,
|
|
"url_structure_score": result.data.get('url_structure', {}).get('score', 0),
|
|
"meta_data_score": result.data.get('meta_data', {}).get('score', 0),
|
|
"content_score": result.data.get('content_analysis', {}).get('score', 0),
|
|
"technical_score": result.data.get('technical_seo', {}).get('score', 0),
|
|
"performance_score": result.data.get('performance', {}).get('score', 0),
|
|
"accessibility_score": result.data.get('accessibility', {}).get('score', 0),
|
|
"user_experience_score": result.data.get('user_experience', {}).get('score', 0),
|
|
"security_score": result.data.get('security_headers', {}).get('score', 0)
|
|
}
|
|
|
|
# Add detailed data for each category
|
|
dashboard_data = {
|
|
"metrics": metrics,
|
|
"critical_issues": result.critical_issues,
|
|
"warnings": result.warnings,
|
|
"recommendations": result.recommendations,
|
|
"detailed_analysis": {
|
|
"url_structure": result.data.get('url_structure', {}),
|
|
"meta_data": result.data.get('meta_data', {}),
|
|
"content_analysis": result.data.get('content_analysis', {}),
|
|
"technical_seo": result.data.get('technical_seo', {}),
|
|
"performance": result.data.get('performance', {}),
|
|
"accessibility": result.data.get('accessibility', {}),
|
|
"user_experience": result.data.get('user_experience', {}),
|
|
"security_headers": result.data.get('security_headers', {}),
|
|
"keyword_analysis": result.data.get('keyword_analysis', {})
|
|
},
|
|
"timestamp": result.timestamp.isoformat(),
|
|
"url": result.url
|
|
}
|
|
|
|
logger.info(f"Detailed SEO metrics retrieved for {url}")
|
|
return SEOMetricsResponse(**dashboard_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting SEO metrics for {url}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error getting SEO metrics: {str(e)}"
|
|
)
|
|
|
|
async def get_analysis_summary(url: str) -> Dict[str, Any]:
|
|
"""
|
|
Get a quick summary of SEO analysis for a URL
|
|
|
|
Args:
|
|
url: The URL to analyze
|
|
|
|
Returns:
|
|
Summary of SEO analysis
|
|
"""
|
|
try:
|
|
# Ensure URL has protocol
|
|
if not url.startswith(('http://', 'https://')):
|
|
url = f"https://{url}"
|
|
|
|
logger.info(f"Getting analysis summary for URL: {url}")
|
|
|
|
# Perform analysis
|
|
result = seo_analyzer.analyze_url_progressive(url)
|
|
|
|
# Create summary
|
|
summary = {
|
|
"url": result.url,
|
|
"overall_score": result.overall_score,
|
|
"health_status": result.health_status,
|
|
"critical_issues_count": len(result.critical_issues),
|
|
"warnings_count": len(result.warnings),
|
|
"recommendations_count": len(result.recommendations),
|
|
"top_issues": result.critical_issues[:3],
|
|
"top_recommendations": result.recommendations[:3],
|
|
"analysis_timestamp": result.timestamp.isoformat()
|
|
}
|
|
|
|
logger.info(f"Analysis summary retrieved for {url}")
|
|
return summary
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting analysis summary for {url}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error getting analysis summary: {str(e)}"
|
|
)
|
|
|
|
async def batch_analyze_urls(urls: List[str]) -> Dict[str, Any]:
|
|
"""
|
|
Analyze multiple URLs in batch
|
|
|
|
Args:
|
|
urls: List of URLs to analyze
|
|
|
|
Returns:
|
|
Batch analysis results
|
|
"""
|
|
try:
|
|
logger.info(f"Starting batch analysis for {len(urls)} URLs")
|
|
|
|
results = []
|
|
|
|
for url in urls:
|
|
try:
|
|
# Ensure URL has protocol
|
|
if not url.startswith(('http://', 'https://')):
|
|
url = f"https://{url}"
|
|
|
|
# Perform analysis
|
|
result = seo_analyzer.analyze_url_progressive(url)
|
|
|
|
# Add to results
|
|
results.append({
|
|
"url": result.url,
|
|
"overall_score": result.overall_score,
|
|
"health_status": result.health_status,
|
|
"critical_issues_count": len(result.critical_issues),
|
|
"warnings_count": len(result.warnings),
|
|
"success": True
|
|
})
|
|
|
|
except Exception as e:
|
|
# Add error result
|
|
results.append({
|
|
"url": url,
|
|
"overall_score": 0,
|
|
"health_status": "error",
|
|
"critical_issues_count": 0,
|
|
"warnings_count": 0,
|
|
"success": False,
|
|
"error": str(e)
|
|
})
|
|
|
|
batch_result = {
|
|
"total_urls": len(urls),
|
|
"successful_analyses": len([r for r in results if r['success']]),
|
|
"failed_analyses": len([r for r in results if not r['success']]),
|
|
"results": results
|
|
}
|
|
|
|
logger.info(f"Batch analysis completed. Success: {batch_result['successful_analyses']}/{len(urls)}")
|
|
return batch_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in batch analysis: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error in batch analysis: {str(e)}"
|
|
)
|
|
|
|
async def analyze_urls_ai(request: AnalyzeURLsRequest, current_user: dict) -> Dict[str, Any]:
|
|
"""Run AI analysis on selected URLs."""
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session()
|
|
results = []
|
|
|
|
try:
|
|
for url in request.urls:
|
|
# Check if audit exists
|
|
audit = db_session.query(SEOPageAudit).filter(
|
|
SEOPageAudit.user_id == user_id,
|
|
SEOPageAudit.page_url == url
|
|
).first()
|
|
|
|
if not audit:
|
|
results.append({"url": url, "status": "skipped", "reason": "No audit found"})
|
|
continue
|
|
|
|
# Prepare Prompt
|
|
# We use the existing audit data (algorithmic) to feed the AI
|
|
audit_summary = {
|
|
"score": audit.overall_score,
|
|
"issues": audit.issues,
|
|
"warnings": audit.warnings
|
|
}
|
|
|
|
prompt = f"""
|
|
As an expert SEO consultant, analyze these technical audit results for the page: {url}
|
|
|
|
AUDIT DATA:
|
|
{json.dumps(audit_summary, default=str)[:3000]}
|
|
|
|
TASK:
|
|
Provide 3 specific, high-impact AI recommendations to improve this page's SEO.
|
|
Focus on content relevance, user intent, and semantic SEO, which the algorithmic audit might miss.
|
|
|
|
OUTPUT JSON format:
|
|
[
|
|
{{ "category": "Content|Technical|UX", "recommendation": "...", "impact": "High|Medium", "effort": "Low|Medium" }}
|
|
]
|
|
"""
|
|
|
|
try:
|
|
ai_response = llm_text_gen(prompt, user_id=user_id)
|
|
# Parse JSON
|
|
import re
|
|
cleaned = ai_response.strip().replace("```json", "").replace("```", "")
|
|
# Simple regex to find the JSON array if extra text exists
|
|
match = re.search(r'\[.*\]', cleaned, re.DOTALL)
|
|
if match:
|
|
cleaned = match.group(0)
|
|
|
|
recommendations = json.loads(cleaned)
|
|
|
|
# Update audit
|
|
current_recs = audit.recommendations or []
|
|
if isinstance(current_recs, list):
|
|
# Tag new ones
|
|
for r in recommendations:
|
|
r['source'] = 'ai_on_demand'
|
|
current_recs.extend(recommendations)
|
|
audit.recommendations = current_recs
|
|
|
|
audit.last_analyzed_at = datetime.utcnow()
|
|
results.append({"url": url, "status": "success"})
|
|
|
|
except Exception as e:
|
|
logger.error(f"AI Analysis failed for {url}: {e}")
|
|
results.append({"url": url, "status": "failed", "error": str(e)})
|
|
|
|
db_session.commit()
|
|
return {"results": results}
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
async def get_analyzed_pages(current_user: dict = Depends(get_current_user)) -> Dict[str, Any]:
|
|
"""Get list of pages that have been analyzed by AI."""
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session()
|
|
|
|
try:
|
|
audits = db_session.query(SEOPageAudit).filter(
|
|
SEOPageAudit.user_id == user_id
|
|
).all()
|
|
|
|
results = []
|
|
for audit in audits:
|
|
if audit.recommendations:
|
|
results.append({
|
|
"url": audit.page_url,
|
|
"analyzed_at": audit.last_analyzed_at,
|
|
"score": audit.overall_score,
|
|
"recommendations_count": len(audit.recommendations)
|
|
})
|
|
|
|
return {"results": results}
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
# New SEO Dashboard Endpoints with Real Data
|
|
|
|
async def get_seo_dashboard_overview(
|
|
current_user: dict = Depends(get_current_user),
|
|
site_url: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""Get comprehensive SEO dashboard overview with real GSC/Bing data."""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_session_for_user(user_id)
|
|
|
|
if not db_session:
|
|
logger.error("No database session available")
|
|
raise HTTPException(status_code=500, detail="Database connection failed")
|
|
|
|
try:
|
|
# Use SEO dashboard service to get real data
|
|
dashboard_service = SEODashboardService(db_session)
|
|
overview_data = await dashboard_service.get_dashboard_overview(user_id, site_url)
|
|
|
|
logger.info(f"Retrieved SEO dashboard overview for user {user_id}")
|
|
return overview_data
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting SEO dashboard overview: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get dashboard overview")
|
|
|
|
async def get_gsc_raw_data(
|
|
current_user: dict = Depends(get_current_user),
|
|
site_url: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""Get raw GSC data for the specified site."""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session()
|
|
|
|
if not db_session:
|
|
logger.error("No database session available")
|
|
raise HTTPException(status_code=500, detail="Database connection failed")
|
|
|
|
try:
|
|
# Use SEO dashboard service to get GSC data
|
|
dashboard_service = SEODashboardService(db_session)
|
|
gsc_data = await dashboard_service.get_gsc_data(user_id, site_url)
|
|
|
|
logger.info(f"Retrieved GSC raw data for user {user_id}")
|
|
return gsc_data
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting GSC raw data: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get GSC data")
|
|
|
|
async def get_bing_raw_data(
|
|
current_user: dict = Depends(get_current_user),
|
|
site_url: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""Get raw Bing data for the specified site."""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session(user_id)
|
|
|
|
if not db_session:
|
|
logger.error("No database session available")
|
|
raise HTTPException(status_code=500, detail="Database connection failed")
|
|
|
|
try:
|
|
# Use SEO dashboard service to get Bing data
|
|
dashboard_service = SEODashboardService(db_session)
|
|
bing_data = await dashboard_service.get_bing_data(user_id, site_url)
|
|
|
|
logger.info(f"Retrieved Bing raw data for user {user_id}")
|
|
return bing_data
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting Bing raw data: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get Bing data")
|
|
|
|
async def get_competitive_insights(
|
|
current_user: dict = Depends(get_current_user),
|
|
site_url: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""Get competitive insights from onboarding step 3 data."""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session(user_id)
|
|
|
|
if not db_session:
|
|
logger.error("No database session available")
|
|
raise HTTPException(status_code=500, detail="Database connection failed")
|
|
|
|
try:
|
|
# Use SEO dashboard service to get competitive insights
|
|
dashboard_service = SEODashboardService(db_session)
|
|
insights_data = await dashboard_service.get_competitive_insights(user_id)
|
|
|
|
logger.info(f"Retrieved competitive insights for user {user_id}")
|
|
return insights_data
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting competitive insights: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get competitive insights")
|
|
|
|
|
|
async def get_deep_competitor_analysis(
|
|
current_user: dict = Depends(get_current_user),
|
|
site_url: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_session_for_user(user_id)
|
|
|
|
if not db_session:
|
|
logger.error("No database session available")
|
|
raise HTTPException(status_code=500, detail="Database connection failed")
|
|
|
|
try:
|
|
integration_service = OnboardingDataIntegrationService()
|
|
integrated = integration_service.get_integrated_data_sync(user_id, db_session)
|
|
deep = integrated.get("deep_competitor_analysis") if isinstance(integrated, dict) else None
|
|
return deep or {
|
|
"status": "not_available",
|
|
"last_run": None,
|
|
"report": None
|
|
}
|
|
finally:
|
|
db_session.close()
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting deep competitor analysis: {e}")
|
|
raise HTTPException(status_code=500, detail="Failed to get deep competitor analysis")
|
|
|
|
|
|
async def run_strategic_insights(
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Dict[str, Any]:
|
|
"""Run AI-powered strategic insights analysis manually."""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_session_for_user(user_id)
|
|
|
|
if not db_session:
|
|
raise HTTPException(status_code=500, detail="Database connection failed")
|
|
|
|
try:
|
|
integration_service = OnboardingDataIntegrationService()
|
|
integrated = integration_service.get_integrated_data_sync(user_id, db_session)
|
|
|
|
website_analysis_data = integrated.get("website_analysis")
|
|
logger.info(f"Integrated data for user {user_id}: website_analysis found? {bool(website_analysis_data)}")
|
|
|
|
# Fallback: If not found in integrated data (e.g. strict session mismatch), find latest analysis for user
|
|
if not website_analysis_data:
|
|
logger.info(f"Attempting fallback for user {user_id}")
|
|
# Find latest WebsiteAnalysis for this user across all sessions
|
|
latest_analysis = db_session.query(WebsiteAnalysis).join(
|
|
OnboardingSession, WebsiteAnalysis.session_id == OnboardingSession.id
|
|
).filter(
|
|
OnboardingSession.user_id == user_id
|
|
).order_by(WebsiteAnalysis.updated_at.desc()).first()
|
|
|
|
if latest_analysis:
|
|
logger.info(f"Found fallback WebsiteAnalysis {latest_analysis.id} for user {user_id}")
|
|
website_analysis_data = latest_analysis.to_dict()
|
|
# Ensure ID is present for updates
|
|
website_analysis_data['id'] = latest_analysis.id
|
|
else:
|
|
logger.warning(f"Fallback failed for user {user_id}. No WebsiteAnalysis found.")
|
|
|
|
if not website_analysis_data:
|
|
raise HTTPException(status_code=400, detail="Website analysis (Step 2) not found. Please complete onboarding.")
|
|
|
|
research_prefs = integrated.get("research_preferences")
|
|
competitors = (research_prefs.get("competitors") if isinstance(research_prefs, dict) else None)
|
|
|
|
if not competitors:
|
|
# Try competitor_analysis as fallback
|
|
competitors = integrated.get("competitor_analysis") or []
|
|
|
|
if not competitors:
|
|
raise HTTPException(status_code=400, detail="No competitors found. Please add competitors in Step 3.")
|
|
|
|
from services.seo.deep_competitor_analysis_service import DeepCompetitorAnalysisService
|
|
analysis_service = DeepCompetitorAnalysisService()
|
|
|
|
logger.info(f"Running manual strategic insights for user {user_id}")
|
|
report = await analysis_service.generate_weekly_strategy_brief(
|
|
user_id=user_id,
|
|
website_analysis=website_analysis_data if isinstance(website_analysis_data, dict) else {},
|
|
competitors=competitors if isinstance(competitors, list) else []
|
|
)
|
|
|
|
# Find the WebsiteAnalysis record to persist history
|
|
analysis_id = website_analysis_data.get('id') if isinstance(website_analysis_data, dict) else None
|
|
if analysis_id:
|
|
website_analysis = db_session.query(WebsiteAnalysis).filter(WebsiteAnalysis.id == analysis_id).first()
|
|
|
|
if website_analysis:
|
|
history = website_analysis.strategic_insights_history or []
|
|
if not isinstance(history, list):
|
|
history = []
|
|
|
|
# Append new report at the beginning (latest first)
|
|
history.insert(0, report)
|
|
# Keep last 52 weeks (1 year)
|
|
website_analysis.strategic_insights_history = history[:52]
|
|
flag_modified(website_analysis, "strategic_insights_history")
|
|
db_session.commit()
|
|
logger.info(f"Persisted strategic insight for user {user_id} to history")
|
|
|
|
return {"success": True, "report": report}
|
|
finally:
|
|
db_session.close()
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error running strategic insights: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to run strategic insights: {str(e)}")
|
|
|
|
|
|
@router.post("/refresh-data")
|
|
async def refresh_analytics_data(current_user: dict = Depends(get_current_user), site_url: str = None):
|
|
"""Force refresh of analytics data from GSC/Bing."""
|
|
# This would trigger background jobs to fetch fresh data
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session(user_id)
|
|
|
|
if not db_session:
|
|
raise HTTPException(status_code=500, detail="Database connection unavailable")
|
|
|
|
try:
|
|
dashboard_service = SEODashboardService(db_session)
|
|
return await dashboard_service.refresh_analytics_data(user_id, site_url)
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error refreshing analytics data: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@router.get("/strategic-insights-history")
|
|
async def get_strategic_insights_history(
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get history of strategic insights reports."""
|
|
try:
|
|
user_id = str(current_user.get('id'))
|
|
db_session = get_db_session(user_id)
|
|
|
|
if not db_session:
|
|
raise HTTPException(status_code=500, detail="Database connection unavailable")
|
|
|
|
try:
|
|
# Get latest website analysis
|
|
latest_analysis = db_session.query(WebsiteAnalysis).join(
|
|
OnboardingSession, WebsiteAnalysis.session_id == OnboardingSession.id
|
|
).filter(
|
|
OnboardingSession.user_id == user_id
|
|
).order_by(WebsiteAnalysis.updated_at.desc()).first()
|
|
|
|
if not latest_analysis:
|
|
return []
|
|
|
|
return latest_analysis.strategic_insights_history or []
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching strategic insights history: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# Helper methods for data conversion
|
|
def _convert_metrics(summary_data: Dict[str, Any]) -> Dict[str, SEOMetric]:
|
|
"""Convert summary data to SEOMetric format."""
|
|
try:
|
|
return {
|
|
"traffic": SEOMetric(
|
|
value=summary_data.get("clicks", 0),
|
|
change=0, # Would calculate from historical data
|
|
trend="up",
|
|
description="Organic traffic",
|
|
color="#4CAF50"
|
|
),
|
|
"rankings": SEOMetric(
|
|
value=summary_data.get("position", 0),
|
|
change=0, # Would calculate from historical data
|
|
trend="up",
|
|
description="Average ranking",
|
|
color="#2196F3"
|
|
),
|
|
"mobile": SEOMetric(
|
|
value=0, # Would get from performance data
|
|
change=0,
|
|
trend="stable",
|
|
description="Mobile speed",
|
|
color="#FF9800"
|
|
),
|
|
"keywords": SEOMetric(
|
|
value=0, # Would count from query data
|
|
change=0,
|
|
trend="up",
|
|
description="Keywords tracked",
|
|
color="#9C27B0"
|
|
)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error converting metrics: {e}")
|
|
return {}
|
|
|
|
def _convert_platforms(platform_data: Dict[str, Any]) -> Dict[str, PlatformStatus]:
|
|
"""Convert platform data to PlatformStatus format."""
|
|
try:
|
|
return {
|
|
"google_search_console": PlatformStatus(
|
|
status="connected" if platform_data.get("gsc", {}).get("connected", False) else "disconnected",
|
|
connected=platform_data.get("gsc", {}).get("connected", False),
|
|
last_sync=platform_data.get("gsc", {}).get("last_sync"),
|
|
data_points=len(platform_data.get("gsc", {}).get("sites", []))
|
|
),
|
|
"bing_webmaster": PlatformStatus(
|
|
status="connected" if platform_data.get("bing", {}).get("connected", False) else "disconnected",
|
|
connected=platform_data.get("bing", {}).get("connected", False),
|
|
last_sync=platform_data.get("bing", {}).get("last_sync"),
|
|
data_points=len(platform_data.get("bing", {}).get("sites", []))
|
|
)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error converting platforms: {e}")
|
|
return {}
|