1595 lines
62 KiB
Python
1595 lines
62 KiB
Python
"""
|
|
AI SEO Tools FastAPI Router
|
|
|
|
This module provides FastAPI endpoints for all AI SEO tools migrated from ToBeMigrated/ai_seo_tools.
|
|
Includes intelligent logging, exception handling, and structured responses.
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, UploadFile, File
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel, HttpUrl, Field, validator
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from datetime import datetime
|
|
import json
|
|
import traceback
|
|
from loguru import logger
|
|
import os
|
|
import tempfile
|
|
import asyncio
|
|
|
|
from services.workspace_dirs import ensure_global_operational_dirs
|
|
|
|
# Import services
|
|
from services.llm_providers.main_text_generation import llm_text_gen
|
|
from services.seo_tools.meta_description_service import MetaDescriptionService
|
|
from services.seo_tools.pagespeed_service import PageSpeedService
|
|
from services.seo_tools.sitemap_service import SitemapService
|
|
from services.seo_tools.image_alt_service import ImageAltService
|
|
from services.seo_tools.opengraph_service import OpenGraphService
|
|
from services.seo_tools.on_page_seo_service import OnPageSEOService
|
|
from services.seo_tools.technical_seo_service import TechnicalSEOService
|
|
from services.seo_tools.enterprise_seo_service import EnterpriseSEOService
|
|
from services.seo_tools.gsc_analyzer_service import GSCAnalyzerService
|
|
from services.seo_tools.content_strategy_service import ContentStrategyService
|
|
from services.seo_tools.llm_insights_service import LLMInsightsService
|
|
from services.database import get_session_for_user
|
|
from api.content_planning.services.content_strategy.onboarding import OnboardingDataIntegrationService
|
|
from middleware.logging_middleware import log_api_call, save_to_file
|
|
from middleware.auth_middleware import get_current_user
|
|
|
|
router = APIRouter(prefix="/api/seo", tags=["AI SEO Tools"])
|
|
|
|
# Configuration for intelligent logging
|
|
LOG_DIR = "logs/seo_tools"
|
|
|
|
|
|
def ensure_seo_logging_dir() -> str:
|
|
"""Create SEO log directory at runtime (no import-time writes)."""
|
|
ensure_global_operational_dirs({"logs"})
|
|
os.makedirs(LOG_DIR, exist_ok=True)
|
|
return LOG_DIR
|
|
|
|
# Request/Response Models
|
|
class BaseResponse(BaseModel):
|
|
"""Base response model for all SEO tools"""
|
|
success: bool
|
|
message: str
|
|
timestamp: datetime = Field(default_factory=datetime.utcnow)
|
|
execution_time: Optional[float] = None
|
|
data: Optional[Dict[str, Any]] = None
|
|
|
|
class ErrorResponse(BaseResponse):
|
|
"""Error response model"""
|
|
error_type: str
|
|
error_details: Optional[str] = None
|
|
traceback: Optional[str] = None
|
|
|
|
class MetaDescriptionRequest(BaseModel):
|
|
"""Request model for meta description generation"""
|
|
keywords: List[str] = Field(..., description="Target keywords for meta description")
|
|
tone: str = Field(default="General", description="Desired tone for meta description")
|
|
search_intent: str = Field(default="Informational Intent", description="Search intent type")
|
|
language: str = Field(default="English", description="Preferred language")
|
|
custom_prompt: Optional[str] = Field(None, description="Custom prompt for generation")
|
|
|
|
@validator('keywords')
|
|
def validate_keywords(cls, v):
|
|
if not v or len(v) == 0:
|
|
raise ValueError("At least one keyword is required")
|
|
return v
|
|
|
|
class PageSpeedRequest(BaseModel):
|
|
"""Request model for PageSpeed Insights analysis"""
|
|
url: HttpUrl = Field(..., description="URL to analyze")
|
|
strategy: str = Field(default="DESKTOP", description="Analysis strategy (DESKTOP/MOBILE)")
|
|
locale: str = Field(default="en", description="Locale for analysis")
|
|
categories: List[str] = Field(default=["performance", "accessibility", "best-practices", "seo"])
|
|
|
|
class SitemapAnalysisRequest(BaseModel):
|
|
"""Request model for sitemap analysis"""
|
|
sitemap_url: HttpUrl = Field(..., description="Sitemap URL to analyze")
|
|
analyze_content_trends: bool = Field(default=True, description="Analyze content trends")
|
|
analyze_publishing_patterns: bool = Field(default=True, description="Analyze publishing patterns")
|
|
|
|
class ImageAltRequest(BaseModel):
|
|
"""Request model for image alt text generation"""
|
|
image_url: Optional[HttpUrl] = Field(None, description="URL of image to analyze")
|
|
context: Optional[str] = Field(None, description="Context about the image")
|
|
keywords: Optional[List[str]] = Field(None, description="Keywords to include in alt text")
|
|
|
|
class OpenGraphRequest(BaseModel):
|
|
"""Request model for OpenGraph tag generation"""
|
|
url: HttpUrl = Field(..., description="URL for OpenGraph tags")
|
|
title_hint: Optional[str] = Field(None, description="Hint for title")
|
|
description_hint: Optional[str] = Field(None, description="Hint for description")
|
|
platform: str = Field(default="General", description="Platform (General/Facebook/Twitter)")
|
|
|
|
class OnPageSEORequest(BaseModel):
|
|
"""Request model for on-page SEO analysis"""
|
|
url: HttpUrl = Field(..., description="URL to analyze")
|
|
target_keywords: Optional[List[str]] = Field(None, description="Target keywords for analysis")
|
|
analyze_images: bool = Field(default=True, description="Include image analysis")
|
|
analyze_content_quality: bool = Field(default=True, description="Analyze content quality")
|
|
|
|
class TechnicalSEORequest(BaseModel):
|
|
"""Request model for technical SEO analysis"""
|
|
url: HttpUrl = Field(..., description="URL to crawl and analyze")
|
|
crawl_depth: int = Field(default=3, description="Crawl depth (1-5)")
|
|
include_external_links: bool = Field(default=True, description="Include external link analysis")
|
|
analyze_performance: bool = Field(default=True, description="Include performance analysis")
|
|
|
|
class WorkflowRequest(BaseModel):
|
|
"""Request model for SEO workflow execution"""
|
|
website_url: HttpUrl = Field(..., description="Primary website URL")
|
|
workflow_type: str = Field(..., description="Type of workflow to execute")
|
|
competitors: Optional[List[HttpUrl]] = Field(None, description="Competitor URLs (max 5)")
|
|
target_keywords: Optional[List[str]] = Field(None, description="Target keywords")
|
|
custom_parameters: Optional[Dict[str, Any]] = Field(None, description="Custom workflow parameters")
|
|
|
|
class CompetitiveSitemapBenchmarkingRunRequest(BaseModel):
|
|
max_competitors: int = Field(default=5, ge=1, le=10, description="Max competitors to analyze")
|
|
competitors: Optional[List[HttpUrl]] = Field(None, description="Optional explicit competitor URLs")
|
|
|
|
class EnterpriseAuditRequest(BaseModel):
|
|
"""Request model for complete enterprise SEO audit"""
|
|
website_url: HttpUrl = Field(..., description="Primary website URL to audit")
|
|
competitors: Optional[List[HttpUrl]] = Field(None, description="Competitor URLs for benchmarking (max 5)")
|
|
target_keywords: Optional[List[str]] = Field(None, description="Target keywords for analysis")
|
|
include_content_analysis: bool = Field(default=True, description="Include content strategy analysis")
|
|
include_competitive_analysis: bool = Field(default=True, description="Include competitive benchmarking")
|
|
generate_executive_report: bool = Field(default=True, description="Generate executive summary")
|
|
|
|
class GSCAnalysisRequest(BaseModel):
|
|
"""Request model for advanced GSC analysis"""
|
|
site_url: HttpUrl = Field(..., description="Website URL registered in Google Search Console")
|
|
date_range_days: int = Field(default=90, ge=7, le=365, description="Number of days to analyze")
|
|
include_opportunities: bool = Field(default=True, description="Include content opportunity analysis")
|
|
include_competitive: bool = Field(default=True, description="Include competitive positioning")
|
|
|
|
class ContentOpportunitiesRequest(BaseModel):
|
|
"""Request model for content opportunities report"""
|
|
site_url: HttpUrl = Field(..., description="Website URL registered in GSC")
|
|
min_impressions: int = Field(default=100, ge=10, description="Minimum impressions threshold")
|
|
date_range_days: int = Field(default=90, ge=7, le=365, description="Number of days to analyze")
|
|
|
|
# ==================== LLM INSIGHTS REQUEST MODELS ====================
|
|
|
|
class EnterpriseAuditInsightsRequest(BaseModel):
|
|
"""Request model for AI insights from enterprise audit"""
|
|
audit_results: Dict[str, Any] = Field(..., description="Complete audit results")
|
|
website_url: str = Field(..., description="Website being audited")
|
|
target_keywords: Optional[List[str]] = Field(None, description="Target keywords")
|
|
|
|
class GSCAnalysisInsightsRequest(BaseModel):
|
|
"""Request model for AI insights from GSC analysis"""
|
|
gsc_analysis: Dict[str, Any] = Field(..., description="Complete GSC analysis data")
|
|
website_url: str = Field(..., description="Website being analyzed")
|
|
|
|
class ContentStrategyRequest(BaseModel):
|
|
"""Request model for content strategy generation"""
|
|
current_content: Dict[str, Any] = Field(..., description="Current content analysis")
|
|
content_gaps: List[str] = Field(..., description="Identified content gaps")
|
|
target_keywords: List[str] = Field(..., description="Target keywords")
|
|
competitor_content: Optional[Dict[str, Any]] = Field(None, description="Competitor content analysis")
|
|
|
|
class TrafficRoadmapRequest(BaseModel):
|
|
"""Request model for traffic improvement roadmap"""
|
|
current_metrics: Dict[str, Any] = Field(..., description="Current traffic metrics")
|
|
identified_opportunities: List[Dict[str, Any]] = Field(..., description="Improvement opportunities")
|
|
implementation_timeline_weeks: int = Field(default=12, ge=4, le=52, description="Implementation timeline")
|
|
|
|
class CompetitiveInsightsRequest(BaseModel):
|
|
"""Request model for competitive insights generation"""
|
|
primary_site_analysis: Dict[str, Any] = Field(..., description="Primary site analysis")
|
|
competitor_analyses: List[Dict[str, Any]] = Field(..., description="Competitor analyses")
|
|
|
|
class PrioritizedRecommendationsRequest(BaseModel):
|
|
"""Request model for prioritized recommendations"""
|
|
all_recommendations: List[Dict[str, Any]] = Field(..., description="All recommendations to prioritize")
|
|
business_context: Dict[str, Any] = Field(..., description="Business goals and constraints")
|
|
|
|
class QuickWinsRequest(BaseModel):
|
|
"""Request model for quick wins identification"""
|
|
audit_data: Dict[str, Any] = Field(..., description="Complete audit data")
|
|
max_days_to_implement: int = Field(default=7, ge=1, le=30, description="Maximum days to implement")
|
|
|
|
class KeywordExpansionRequest(BaseModel):
|
|
"""Request model for keyword expansion"""
|
|
current_keywords: List[str] = Field(..., description="Current target keywords")
|
|
content_analysis: Dict[str, Any] = Field(..., description="Content analysis data")
|
|
target_difficulty: Optional[str] = Field(None, description="Target difficulty (low/medium/high)")
|
|
|
|
# Exception Handler
|
|
async def handle_seo_tool_exception(func_name: str, error: Exception, request_data: Dict) -> ErrorResponse:
|
|
"""Handle exceptions from SEO tools with intelligent logging"""
|
|
error_id = f"seo_{func_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
|
|
error_msg = str(error)
|
|
error_trace = traceback.format_exc()
|
|
|
|
# Log error with structured data
|
|
error_log = {
|
|
"error_id": error_id,
|
|
"function": func_name,
|
|
"error_type": type(error).__name__,
|
|
"error_message": error_msg,
|
|
"request_data": request_data,
|
|
"traceback": error_trace,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
logger.error(f"SEO Tool Error [{error_id}]: {error_msg}")
|
|
|
|
# Save error to file
|
|
await save_to_file(f"{LOG_DIR}/errors.jsonl", error_log)
|
|
|
|
return ErrorResponse(
|
|
success=False,
|
|
message=f"Error in {func_name}: {error_msg}",
|
|
error_type=type(error).__name__,
|
|
error_details=error_msg,
|
|
traceback=error_trace if os.getenv("DEBUG", "false").lower() == "true" else None
|
|
)
|
|
|
|
# SEO Tool Endpoints
|
|
|
|
@router.post("/meta-description", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def generate_meta_description(
|
|
request: MetaDescriptionRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Generate AI-powered SEO meta descriptions
|
|
|
|
Generates compelling, SEO-optimized meta descriptions based on keywords,
|
|
tone, and search intent using advanced AI analysis.
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
user_id = str(current_user.get("id")) if current_user else None
|
|
service = MetaDescriptionService()
|
|
result = await service.generate_meta_description(
|
|
keywords=request.keywords,
|
|
tone=request.tone,
|
|
search_intent=request.search_intent,
|
|
language=request.language,
|
|
custom_prompt=request.custom_prompt,
|
|
user_id=user_id
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Log successful operation
|
|
log_data = {
|
|
"operation": "meta_description_generation",
|
|
"keywords_count": len(request.keywords),
|
|
"tone": request.tone,
|
|
"language": request.language,
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Meta description generated successfully",
|
|
execution_time=execution_time,
|
|
data=result
|
|
)
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("generate_meta_description", e, request.dict())
|
|
|
|
@router.post("/pagespeed-analysis", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def analyze_pagespeed(
|
|
request: PageSpeedRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Analyze website performance using Google PageSpeed Insights
|
|
|
|
Provides comprehensive performance analysis including Core Web Vitals,
|
|
accessibility, SEO, and best practices scores with AI-enhanced insights.
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
user_id = str(current_user.get("id")) if current_user else None
|
|
service = PageSpeedService()
|
|
result = await service.analyze_pagespeed(
|
|
url=str(request.url),
|
|
strategy=request.strategy,
|
|
locale=request.locale,
|
|
categories=request.categories,
|
|
user_id=user_id
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Log successful operation
|
|
log_data = {
|
|
"operation": "pagespeed_analysis",
|
|
"url": str(request.url),
|
|
"strategy": request.strategy,
|
|
"categories": request.categories,
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="PageSpeed analysis completed successfully",
|
|
execution_time=execution_time,
|
|
data=result
|
|
)
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("analyze_pagespeed", e, request.dict())
|
|
|
|
@router.post("/sitemap-analysis", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def analyze_sitemap(
|
|
request: SitemapAnalysisRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Analyze website sitemap for content structure and trends
|
|
|
|
Provides insights into content distribution, publishing patterns,
|
|
and SEO opportunities with AI-powered recommendations.
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
user_id = str(current_user.get("id")) if current_user else None
|
|
service = SitemapService()
|
|
result = await service.analyze_sitemap(
|
|
sitemap_url=str(request.sitemap_url),
|
|
analyze_content_trends=request.analyze_content_trends,
|
|
analyze_publishing_patterns=request.analyze_publishing_patterns,
|
|
user_id=user_id
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Log successful operation
|
|
log_data = {
|
|
"operation": "sitemap_analysis",
|
|
"sitemap_url": str(request.sitemap_url),
|
|
"urls_found": result.get("total_urls", 0),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Sitemap analysis completed successfully",
|
|
execution_time=execution_time,
|
|
data=result
|
|
)
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("analyze_sitemap", e, request.dict())
|
|
|
|
@router.post("/image-alt-text", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def generate_image_alt_text(
|
|
request: ImageAltRequest = None,
|
|
image_file: UploadFile = File(None),
|
|
background_tasks: BackgroundTasks = BackgroundTasks()
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Generate AI-powered alt text for images
|
|
|
|
Creates SEO-optimized alt text for images using advanced AI vision
|
|
models with context-aware keyword integration.
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
service = ImageAltService()
|
|
|
|
if image_file:
|
|
# Handle uploaded file
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{image_file.filename.split('.')[-1]}") as tmp_file:
|
|
content = await image_file.read()
|
|
tmp_file.write(content)
|
|
tmp_file_path = tmp_file.name
|
|
|
|
result = await service.generate_alt_text_from_file(
|
|
image_path=tmp_file_path,
|
|
context=request.context if request else None,
|
|
keywords=request.keywords if request else None
|
|
)
|
|
|
|
# Cleanup
|
|
os.unlink(tmp_file_path)
|
|
|
|
elif request and request.image_url:
|
|
result = await service.generate_alt_text_from_url(
|
|
image_url=str(request.image_url),
|
|
context=request.context,
|
|
keywords=request.keywords
|
|
)
|
|
else:
|
|
raise ValueError("Either image_file or image_url must be provided")
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Log successful operation
|
|
log_data = {
|
|
"operation": "image_alt_text_generation",
|
|
"has_image_file": image_file is not None,
|
|
"has_image_url": request.image_url is not None if request else False,
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Image alt text generated successfully",
|
|
execution_time=execution_time,
|
|
data=result
|
|
)
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("generate_image_alt_text", e,
|
|
request.dict() if request else {})
|
|
|
|
@router.post("/opengraph-tags", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def generate_opengraph_tags(
|
|
request: OpenGraphRequest,
|
|
background_tasks: BackgroundTasks
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Generate OpenGraph tags for social media optimization
|
|
|
|
Creates platform-specific OpenGraph tags optimized for Facebook,
|
|
Twitter, and other social platforms with AI-powered content analysis.
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
service = OpenGraphService()
|
|
result = await service.generate_opengraph_tags(
|
|
url=str(request.url),
|
|
title_hint=request.title_hint,
|
|
description_hint=request.description_hint,
|
|
platform=request.platform
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Log successful operation
|
|
log_data = {
|
|
"operation": "opengraph_generation",
|
|
"url": str(request.url),
|
|
"platform": request.platform,
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="OpenGraph tags generated successfully",
|
|
execution_time=execution_time,
|
|
data=result
|
|
)
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("generate_opengraph_tags", e, request.dict())
|
|
|
|
@router.post("/on-page-analysis", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def analyze_on_page_seo(
|
|
request: OnPageSEORequest,
|
|
background_tasks: BackgroundTasks
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Comprehensive on-page SEO analysis
|
|
|
|
Analyzes meta tags, content quality, keyword optimization, internal linking,
|
|
and provides actionable AI-powered recommendations for improvement.
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
service = OnPageSEOService()
|
|
result = await service.analyze_on_page_seo(
|
|
url=str(request.url),
|
|
target_keywords=request.target_keywords,
|
|
analyze_images=request.analyze_images,
|
|
analyze_content_quality=request.analyze_content_quality
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Log successful operation
|
|
log_data = {
|
|
"operation": "on_page_seo_analysis",
|
|
"url": str(request.url),
|
|
"target_keywords_count": len(request.target_keywords) if request.target_keywords else 0,
|
|
"seo_score": result.get("overall_score", 0),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="On-page SEO analysis completed successfully",
|
|
execution_time=execution_time,
|
|
data=result
|
|
)
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("analyze_on_page_seo", e, request.dict())
|
|
|
|
@router.post("/technical-seo", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def analyze_technical_seo(
|
|
request: TechnicalSEORequest,
|
|
background_tasks: BackgroundTasks
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Technical SEO analysis and crawling
|
|
|
|
Performs comprehensive technical SEO audit including site structure,
|
|
crawlability, indexability, and performance with AI-enhanced insights.
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
service = TechnicalSEOService()
|
|
result = await service.analyze_technical_seo(
|
|
url=str(request.url),
|
|
crawl_depth=request.crawl_depth,
|
|
include_external_links=request.include_external_links,
|
|
analyze_performance=request.analyze_performance
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Log successful operation
|
|
log_data = {
|
|
"operation": "technical_seo_analysis",
|
|
"url": str(request.url),
|
|
"crawl_depth": request.crawl_depth,
|
|
"pages_crawled": result.get("pages_crawled", 0),
|
|
"issues_found": len(result.get("issues", [])),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Technical SEO analysis completed successfully",
|
|
execution_time=execution_time,
|
|
data=result
|
|
)
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("analyze_technical_seo", e, request.dict())
|
|
|
|
# Workflow Endpoints
|
|
|
|
@router.post("/workflow/website-audit", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def execute_website_audit(
|
|
request: WorkflowRequest,
|
|
background_tasks: BackgroundTasks
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Complete website SEO audit workflow
|
|
|
|
Executes a comprehensive SEO audit combining on-page analysis,
|
|
technical SEO, performance analysis, and competitive intelligence.
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
service = EnterpriseSEOService()
|
|
result = await service.execute_complete_audit(
|
|
website_url=str(request.website_url),
|
|
competitors=[str(comp) for comp in request.competitors] if request.competitors else [],
|
|
target_keywords=request.target_keywords or []
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Log successful operation
|
|
log_data = {
|
|
"operation": "website_audit_workflow",
|
|
"website_url": str(request.website_url),
|
|
"competitors_count": len(request.competitors) if request.competitors else 0,
|
|
"overall_score": result.get("overall_score", 0),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/workflows.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Website audit completed successfully",
|
|
execution_time=execution_time,
|
|
data=result
|
|
)
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("execute_website_audit", e, request.dict())
|
|
|
|
@router.post("/workflow/content-analysis", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def execute_content_analysis(
|
|
request: WorkflowRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
AI-powered content analysis workflow
|
|
|
|
Analyzes content gaps, opportunities, and competitive positioning
|
|
with AI-generated strategic recommendations for content creators.
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
user_id = str(current_user.get("id")) if current_user else None
|
|
service = ContentStrategyService()
|
|
result = await service.analyze_content_strategy(
|
|
website_url=str(request.website_url),
|
|
competitors=[str(comp) for comp in request.competitors] if request.competitors else [],
|
|
target_keywords=request.target_keywords or [],
|
|
custom_parameters=request.custom_parameters or {},
|
|
user_id=user_id
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Log successful operation
|
|
log_data = {
|
|
"operation": "content_analysis_workflow",
|
|
"website_url": str(request.website_url),
|
|
"content_gaps_found": len(result.get("content_gaps", [])),
|
|
"opportunities_identified": len(result.get("opportunities", [])),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/workflows.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Content analysis completed successfully",
|
|
execution_time=execution_time,
|
|
data=result
|
|
)
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("execute_content_analysis", e, request.dict())
|
|
|
|
# Background Task for Sitemap Benchmarking
|
|
async def _run_sitemap_benchmark_background(
|
|
user_id: str,
|
|
website_url: str,
|
|
competitors: List[str],
|
|
max_competitors: int
|
|
):
|
|
"""Background task for running sitemap benchmarking"""
|
|
logger.info(f"Starting background sitemap benchmark for user {user_id}")
|
|
|
|
# Create a new session for the background task
|
|
db = get_session_for_user(user_id)
|
|
if not db:
|
|
logger.error(f"Failed to get database session for user {user_id}")
|
|
return
|
|
|
|
try:
|
|
service = ContentStrategyService()
|
|
integration_service = OnboardingDataIntegrationService()
|
|
|
|
# Run analysis (long running)
|
|
report = await service.analyze_competitive_sitemap_benchmarking(
|
|
website_url=website_url,
|
|
competitors=competitors,
|
|
max_competitors=max_competitors,
|
|
user_id=user_id
|
|
)
|
|
|
|
# Persist results
|
|
persisted = await integration_service.store_competitive_sitemap_benchmarking(user_id, report, db)
|
|
|
|
if persisted:
|
|
logger.info(f"✅ Background sitemap benchmark completed and saved for user {user_id}")
|
|
else:
|
|
logger.error(f"❌ Failed to persist background sitemap benchmark for user {user_id}")
|
|
await integration_service.update_competitive_sitemap_benchmarking_status(user_id, "failed", db, error="Failed to persist results")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error in background sitemap benchmark for user {user_id}: {str(e)}")
|
|
logger.error(traceback.format_exc())
|
|
try:
|
|
integration_service = OnboardingDataIntegrationService()
|
|
await integration_service.update_competitive_sitemap_benchmarking_status(user_id, "failed", db, error=str(e))
|
|
except Exception as update_err:
|
|
logger.error(f"Failed to update error status: {update_err}")
|
|
finally:
|
|
db.close()
|
|
|
|
@router.post("/competitive-sitemap-benchmarking/run", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def run_competitive_sitemap_benchmarking(
|
|
request: CompetitiveSitemapBenchmarkingRunRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
user_id = str(current_user.get("id")) if current_user else None
|
|
if not user_id:
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
# Get initial data to validate request
|
|
db = get_session_for_user(user_id)
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database connection failed")
|
|
|
|
try:
|
|
integration_service = OnboardingDataIntegrationService()
|
|
integrated = integration_service.get_integrated_data_sync(user_id, db)
|
|
website_analysis = integrated.get("website_analysis") if isinstance(integrated, dict) else {}
|
|
website_url = website_analysis.get("website_url") if isinstance(website_analysis, dict) else None
|
|
|
|
competitor_urls: List[str] = []
|
|
if request.competitors:
|
|
competitor_urls = [str(c) for c in request.competitors]
|
|
else:
|
|
competitor_analysis = integrated.get("competitor_analysis") if isinstance(integrated, dict) else []
|
|
if isinstance(competitor_analysis, list):
|
|
for comp in competitor_analysis:
|
|
if not isinstance(comp, dict):
|
|
continue
|
|
url = comp.get("competitor_url") or comp.get("url") or comp.get("website_url")
|
|
if url:
|
|
competitor_urls.append(str(url))
|
|
|
|
if not website_url:
|
|
raise HTTPException(status_code=400, detail="No website_url found. Complete onboarding step 2 first.")
|
|
|
|
# Set status to processing
|
|
await integration_service.update_competitive_sitemap_benchmarking_status(user_id, "processing", db)
|
|
|
|
# Queue background task
|
|
background_tasks.add_task(
|
|
_run_sitemap_benchmark_background,
|
|
user_id=user_id,
|
|
website_url=str(website_url),
|
|
competitors=competitor_urls,
|
|
max_competitors=request.max_competitors
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Competitive sitemap benchmarking started in background",
|
|
execution_time=execution_time,
|
|
data={
|
|
"status": "queued",
|
|
"competitors_count": len(competitor_urls)
|
|
}
|
|
)
|
|
finally:
|
|
try:
|
|
db.close()
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("run_competitive_sitemap_benchmarking", e, request.dict())
|
|
|
|
@router.get("/competitive-sitemap-benchmarking", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def get_competitive_sitemap_benchmarking(
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
try:
|
|
user_id = str(current_user.get("id")) if current_user else None
|
|
if not user_id:
|
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
|
|
|
db = get_session_for_user(user_id)
|
|
if not db:
|
|
raise HTTPException(status_code=500, detail="Database connection failed")
|
|
|
|
try:
|
|
integration_service = OnboardingDataIntegrationService()
|
|
integrated = integration_service.get_integrated_data_sync(user_id, db)
|
|
website_analysis = integrated.get("website_analysis") if isinstance(integrated, dict) else {}
|
|
seo_audit = website_analysis.get("seo_audit") if isinstance(website_analysis, dict) else {}
|
|
report = seo_audit.get("competitive_sitemap_benchmarking") if isinstance(seo_audit, dict) else None
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Competitive sitemap benchmarking loaded",
|
|
data={
|
|
"report": report
|
|
}
|
|
)
|
|
finally:
|
|
try:
|
|
db.close()
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("get_competitive_sitemap_benchmarking", e, {})
|
|
|
|
# Health and Status Endpoints
|
|
|
|
@router.get("/health", response_model=BaseResponse)
|
|
async def health_check() -> BaseResponse:
|
|
"""Health check endpoint for SEO tools"""
|
|
return BaseResponse(
|
|
success=True,
|
|
message="AI SEO Tools API is healthy",
|
|
data={
|
|
"status": "operational",
|
|
"available_tools": [
|
|
"meta_description",
|
|
"pagespeed_analysis",
|
|
"sitemap_analysis",
|
|
"image_alt_text",
|
|
"opengraph_tags",
|
|
"on_page_analysis",
|
|
"technical_seo",
|
|
"website_audit",
|
|
"content_analysis"
|
|
],
|
|
"version": "1.0.0"
|
|
}
|
|
)
|
|
|
|
@router.get("/tools/status", response_model=BaseResponse)
|
|
async def get_tools_status() -> BaseResponse:
|
|
"""Get status of all SEO tools and their dependencies"""
|
|
|
|
tools_status = {}
|
|
overall_healthy = True
|
|
|
|
# Check each service
|
|
services = [
|
|
("meta_description", MetaDescriptionService),
|
|
("pagespeed", PageSpeedService),
|
|
("sitemap", SitemapService),
|
|
("image_alt", ImageAltService),
|
|
("opengraph", OpenGraphService),
|
|
("on_page_seo", OnPageSEOService),
|
|
("technical_seo", TechnicalSEOService),
|
|
("enterprise_seo", EnterpriseSEOService),
|
|
("content_strategy", ContentStrategyService)
|
|
]
|
|
|
|
for service_name, service_class in services:
|
|
try:
|
|
service = service_class()
|
|
status = await service.health_check() if hasattr(service, 'health_check') else {"status": "unknown"}
|
|
tools_status[service_name] = {
|
|
"healthy": status.get("status") == "operational",
|
|
"details": status
|
|
}
|
|
if not tools_status[service_name]["healthy"]:
|
|
overall_healthy = False
|
|
except Exception as e:
|
|
tools_status[service_name] = {
|
|
"healthy": False,
|
|
"error": str(e)
|
|
}
|
|
overall_healthy = False
|
|
|
|
return BaseResponse(
|
|
success=overall_healthy,
|
|
message="Tools status check completed",
|
|
data={
|
|
"overall_healthy": overall_healthy,
|
|
"tools": tools_status,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
)
|
|
|
|
|
|
# ==================== ENTERPRISE AUDIT ENDPOINTS ====================
|
|
|
|
@router.post("/enterprise/complete-audit", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def execute_enterprise_audit(
|
|
request: EnterpriseAuditRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Execute comprehensive enterprise SEO audit with full orchestration.
|
|
|
|
Combines multiple SEO analysis tools into an intelligent workflow:
|
|
- Technical SEO audit with issue severity classification
|
|
- On-page SEO analysis with keyword optimization
|
|
- PageSpeed Insights with Core Web Vitals analysis
|
|
- Sitemap analysis with trend detection
|
|
- Content strategy with competitive comparison
|
|
- Competitive benchmarking across specified competitors
|
|
- AI-powered insights and recommendations
|
|
|
|
Returns prioritized action items with implementation roadmap.
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Starting enterprise audit for {request.website_url}")
|
|
|
|
# Initialize service
|
|
enterprise_service = EnterpriseSEOService()
|
|
|
|
# Execute audit
|
|
audit_result = await enterprise_service.execute_complete_audit(
|
|
website_url=str(request.website_url),
|
|
competitors=[str(c) for c in request.competitors] if request.competitors else [],
|
|
target_keywords=request.target_keywords or [],
|
|
include_content_analysis=request.include_content_analysis,
|
|
include_competitive_analysis=request.include_competitive_analysis,
|
|
generate_executive_report=request.generate_executive_report
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Complete enterprise audit executed successfully",
|
|
execution_time=execution_time,
|
|
data=audit_result
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Enterprise audit failed: {str(e)}", exc_info=True)
|
|
return await handle_seo_tool_exception("execute_enterprise_audit", e, request.dict())
|
|
|
|
|
|
@router.post("/enterprise/quick-audit", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def execute_quick_enterprise_audit(
|
|
website_url: HttpUrl,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Execute quick 5-minute enterprise audit focusing on critical issues.
|
|
|
|
Provides rapid assessment of most critical SEO problems:
|
|
- Technical SEO critical issues
|
|
- PageSpeed performance bottlenecks
|
|
- Top 3 actionable recommendations
|
|
- Estimated business impact
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Starting quick audit for {website_url}")
|
|
|
|
enterprise_service = EnterpriseSEOService()
|
|
audit_result = await enterprise_service.execute_quick_audit(str(website_url))
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Quick audit completed",
|
|
execution_time=execution_time,
|
|
data=audit_result
|
|
)
|
|
|
|
except Exception as e:
|
|
return await handle_seo_tool_exception("execute_quick_enterprise_audit", e, {"website_url": str(website_url)})
|
|
|
|
|
|
# ==================== ADVANCED GSC ANALYSIS ENDPOINTS ====================
|
|
|
|
@router.post("/gsc/analyze-search-performance", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def analyze_gsc_search_performance(
|
|
request: GSCAnalysisRequest,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Advanced Google Search Console analysis with comprehensive insights.
|
|
|
|
Provides deep dive into search performance:
|
|
- Performance overview with aggregated metrics
|
|
- Keyword analysis with trend detection
|
|
- Page-level performance breakdown
|
|
- Content opportunity identification (15+ opportunities scored)
|
|
- Technical SEO signal analysis
|
|
- Competitive positioning assessment
|
|
- AI-powered strategic recommendations
|
|
|
|
Each analysis component includes:
|
|
- Current metrics and trends
|
|
- Performance scores (0-100)
|
|
- Actionable recommendations
|
|
- Implementation priority
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Starting GSC analysis for {request.site_url}")
|
|
|
|
user_id = str(current_user.get("id")) if current_user else None
|
|
|
|
gsc_service = GSCAnalyzerService()
|
|
analysis_result = await gsc_service.analyze_search_performance(
|
|
site_url=str(request.site_url),
|
|
date_range_days=request.date_range_days,
|
|
user_id=user_id
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="GSC search performance analysis completed",
|
|
execution_time=execution_time,
|
|
data=analysis_result
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"GSC analysis failed: {str(e)}", exc_info=True)
|
|
return await handle_seo_tool_exception("analyze_gsc_search_performance", e, request.dict())
|
|
|
|
|
|
@router.post("/gsc/content-opportunities", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def get_content_opportunities_report(
|
|
request: ContentOpportunitiesRequest,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Generate detailed content opportunities report from GSC data.
|
|
|
|
Identifies high-priority content gaps and optimization opportunities:
|
|
- Queries with high volume but low CTR (meta/title optimization)
|
|
- Keywords ranking 4-10 (ready for ranking improvement)
|
|
- Long-tail keywords with expansion potential
|
|
- Competitive white space analysis
|
|
|
|
For each opportunity includes:
|
|
- Current position and metrics
|
|
- Estimated traffic gain
|
|
- Optimization strategy
|
|
- Implementation difficulty
|
|
- Phased roadmap (Phase 1, 2, 3)
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Generating content opportunities for {request.site_url}")
|
|
|
|
gsc_service = GSCAnalyzerService()
|
|
report = await gsc_service.get_content_opportunities_report(
|
|
site_url=str(request.site_url),
|
|
min_impressions=request.min_impressions,
|
|
date_range_days=request.date_range_days
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Content opportunities report generated",
|
|
execution_time=execution_time,
|
|
data=report
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Content opportunities report failed: {str(e)}", exc_info=True)
|
|
return await handle_seo_tool_exception("get_content_opportunities_report", e, request.dict())
|
|
|
|
|
|
@router.get("/enterprise/health", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def check_enterprise_services_health() -> BaseResponse:
|
|
"""Health check for enterprise services"""
|
|
try:
|
|
enterprise_service = EnterpriseSEOService()
|
|
gsc_service = GSCAnalyzerService()
|
|
|
|
enterprise_health = await enterprise_service.health_check()
|
|
gsc_health = await gsc_service.health_check()
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Enterprise services health check completed",
|
|
data={
|
|
"enterprise_seo_service": enterprise_health,
|
|
"gsc_analyzer_service": gsc_health,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Enterprise health check failed: {str(e)}")
|
|
return BaseResponse(
|
|
success=False,
|
|
message="Enterprise health check failed",
|
|
data={"error": str(e)}
|
|
)
|
|
|
|
|
|
# ==================== LLM INSIGHTS ENDPOINTS (Phase 2A.2) ====================
|
|
|
|
@router.post("/llm/generate-audit-insights", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def generate_audit_insights(
|
|
request: EnterpriseAuditInsightsRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Generate AI-powered insights from enterprise SEO audit results.
|
|
|
|
Analyzes audit findings and produces strategic, actionable insights with:
|
|
- Priority scoring (1-10 scale)
|
|
- Traffic impact projections
|
|
- Implementation difficulty assessments
|
|
- Step-by-step action guides
|
|
- Required tools and resources
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Generating audit insights for {request.website_url}")
|
|
|
|
llm_service = LLMInsightsService()
|
|
insights = await llm_service.generate_enterprise_audit_insights(
|
|
audit_results=request.audit_results,
|
|
website_url=request.website_url,
|
|
target_keywords=request.target_keywords
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Log successful operation
|
|
log_data = {
|
|
"operation": "audit_insights_generation",
|
|
"website_url": request.website_url,
|
|
"insights_generated": len(insights.get('insights', [])),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/llm_operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Audit insights generated successfully",
|
|
execution_time=execution_time,
|
|
data=insights
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Audit insights generation failed: {str(e)}", exc_info=True)
|
|
return await handle_seo_tool_exception("generate_audit_insights", e, {"website_url": request.website_url})
|
|
|
|
|
|
@router.post("/llm/generate-gsc-insights", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def generate_gsc_insights(
|
|
request: GSCAnalysisInsightsRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Generate strategic insights from GSC search performance analysis.
|
|
|
|
Produces targeted, actionable insights including:
|
|
- Keyword optimization opportunities
|
|
- Content ranking improvement strategies
|
|
- CTR enhancement tactics
|
|
- Competitive positioning analysis
|
|
- Quick-win identification
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Generating GSC insights for {request.website_url}")
|
|
|
|
llm_service = LLMInsightsService()
|
|
insights = await llm_service.generate_gsc_analysis_insights(
|
|
gsc_analysis=request.gsc_analysis,
|
|
website_url=request.website_url
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
log_data = {
|
|
"operation": "gsc_insights_generation",
|
|
"website_url": request.website_url,
|
|
"insights_generated": len(insights.get('insights', [])),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/llm_operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="GSC insights generated successfully",
|
|
execution_time=execution_time,
|
|
data=insights
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"GSC insights generation failed: {str(e)}", exc_info=True)
|
|
return await handle_seo_tool_exception("generate_gsc_insights", e, {"website_url": request.website_url})
|
|
|
|
|
|
@router.post("/llm/generate-content-strategy", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def generate_content_strategy(
|
|
request: ContentStrategyRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Generate comprehensive content strategy with AI recommendations.
|
|
|
|
Creates detailed strategy including:
|
|
- Content gap analysis and solutions
|
|
- Content calendar recommendations
|
|
- Keyword-to-content mapping
|
|
- Competitive content benchmarking
|
|
- Topic cluster suggestions
|
|
- Publishing frequency recommendations
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Generating content strategy ({len(request.content_gaps)} gaps)")
|
|
|
|
llm_service = LLMInsightsService()
|
|
strategy = await llm_service.generate_content_strategy_insights(
|
|
current_content=request.current_content,
|
|
content_gaps=request.content_gaps,
|
|
target_keywords=request.target_keywords,
|
|
competitor_content=request.competitor_content
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
log_data = {
|
|
"operation": "content_strategy_generation",
|
|
"gaps_addressed": len(request.content_gaps),
|
|
"keywords_analyzed": len(request.target_keywords),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/llm_operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Content strategy generated successfully",
|
|
execution_time=execution_time,
|
|
data=strategy
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Content strategy generation failed: {str(e)}", exc_info=True)
|
|
return await handle_seo_tool_exception("generate_content_strategy", e, {"gaps_count": len(request.content_gaps)})
|
|
|
|
|
|
@router.post("/llm/generate-traffic-roadmap", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def generate_traffic_roadmap(
|
|
request: TrafficRoadmapRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Generate phased traffic improvement roadmap with projections.
|
|
|
|
Produces detailed roadmap with:
|
|
- Phased implementation plan (Week 1, 2, 3+)
|
|
- Traffic gain projections per phase
|
|
- Priority-ordered action items
|
|
- Resource requirements per phase
|
|
- Key performance indicators (KPIs)
|
|
- Success metrics and validation points
|
|
- Risk mitigation strategies
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Generating traffic roadmap ({request.implementation_timeline_weeks} weeks)")
|
|
|
|
llm_service = LLMInsightsService()
|
|
roadmap = await llm_service.generate_traffic_improvement_roadmap(
|
|
current_metrics=request.current_metrics,
|
|
identified_opportunities=request.identified_opportunities,
|
|
implementation_timeline_weeks=request.implementation_timeline_weeks
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
log_data = {
|
|
"operation": "traffic_roadmap_generation",
|
|
"timeline_weeks": request.implementation_timeline_weeks,
|
|
"opportunities_count": len(request.identified_opportunities),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/llm_operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Traffic roadmap generated successfully",
|
|
execution_time=execution_time,
|
|
data=roadmap
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Traffic roadmap generation failed: {str(e)}", exc_info=True)
|
|
return await handle_seo_tool_exception("generate_traffic_roadmap", e,
|
|
{"opportunities_count": len(request.identified_opportunities)})
|
|
|
|
|
|
@router.post("/llm/generate-competitive-insights", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def generate_competitive_insights(
|
|
request: CompetitiveInsightsRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Generate competitive positioning and intelligence insights.
|
|
|
|
Analyzes competitive landscape and provides:
|
|
- Competitive advantage identification
|
|
- Competitive gap analysis
|
|
- Market opportunity identification
|
|
- Threat assessment
|
|
- Win strategy recommendations
|
|
- Differentiation recommendations
|
|
- Market position recommendations
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Generating competitive insights ({len(request.competitor_analyses)} competitors)")
|
|
|
|
llm_service = LLMInsightsService()
|
|
insights = await llm_service.generate_competitive_insights(
|
|
primary_site_analysis=request.primary_site_analysis,
|
|
competitor_analyses=request.competitor_analyses
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
log_data = {
|
|
"operation": "competitive_insights_generation",
|
|
"competitors_analyzed": len(request.competitor_analyses),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/llm_operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Competitive insights generated successfully",
|
|
execution_time=execution_time,
|
|
data=insights
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Competitive insights generation failed: {str(e)}", exc_info=True)
|
|
return await handle_seo_tool_exception("generate_competitive_insights", e,
|
|
{"competitors_count": len(request.competitor_analyses)})
|
|
|
|
|
|
@router.post("/llm/prioritized-recommendations", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def get_prioritized_recommendations(
|
|
request: PrioritizedRecommendationsRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Get AI-prioritized recommendations ranked by business impact.
|
|
|
|
Scores and prioritizes recommendations by:
|
|
- Traffic impact potential
|
|
- Implementation effort required
|
|
- Resource requirements
|
|
- Timeline to implementation
|
|
- Business alignment
|
|
- Risk level
|
|
- ROI potential
|
|
|
|
Returns categorized as: Quick Wins | High Impact | Long-term
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Prioritizing {len(request.all_recommendations)} recommendations")
|
|
|
|
llm_service = LLMInsightsService()
|
|
prioritized = await llm_service.generate_prioritized_recommendations(
|
|
all_recommendations=request.all_recommendations,
|
|
business_context=request.business_context
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
log_data = {
|
|
"operation": "prioritized_recommendations_generation",
|
|
"total_recommendations": len(request.all_recommendations),
|
|
"quick_wins": len(prioritized.get('quick_wins', [])),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/llm_operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Recommendations prioritized successfully",
|
|
execution_time=execution_time,
|
|
data=prioritized
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Recommendation prioritization failed: {str(e)}", exc_info=True)
|
|
return await handle_seo_tool_exception("get_prioritized_recommendations", e,
|
|
{"recommendations_count": len(request.all_recommendations)})
|
|
|
|
|
|
@router.post("/llm/quick-wins", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def identify_quick_wins(
|
|
request: QuickWinsRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Identify quick wins - high-impact actions implementable in short timeframe.
|
|
|
|
Finds high-ROI quick wins including:
|
|
- Meta tag optimization opportunities
|
|
- URL structure improvements
|
|
- On-page optimization quick fixes
|
|
- Internal linking recommendations
|
|
- Content formatting improvements
|
|
- Technical SEO quick fixes
|
|
- Performance optimization opportunities
|
|
|
|
Each with: estimated traffic gain, implementation time, tools needed, expected outcomes
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Identifying quick wins (max {request.max_days_to_implement} days)")
|
|
|
|
llm_service = LLMInsightsService()
|
|
quick_wins = await llm_service.generate_quick_wins(
|
|
audit_data=request.audit_data,
|
|
max_days_to_implement=request.max_days_to_implement
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
log_data = {
|
|
"operation": "quick_wins_identification",
|
|
"max_days": request.max_days_to_implement,
|
|
"quick_wins_found": len(quick_wins.get('quick_wins', [])),
|
|
"total_potential_traffic": quick_wins.get('total_potential_traffic', 0),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/llm_operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Quick wins identified successfully",
|
|
execution_time=execution_time,
|
|
data=quick_wins
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Quick wins identification failed: {str(e)}", exc_info=True)
|
|
return await handle_seo_tool_exception("identify_quick_wins", e,
|
|
{"max_days": request.max_days_to_implement})
|
|
|
|
|
|
@router.post("/llm/keyword-expansion", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def expand_keywords(
|
|
request: KeywordExpansionRequest,
|
|
background_tasks: BackgroundTasks,
|
|
current_user: dict = Depends(get_current_user)
|
|
) -> Union[BaseResponse, ErrorResponse]:
|
|
"""
|
|
Expand keyword list with AI-generated related and long-tail keywords.
|
|
|
|
Generates 15-20 additional keywords including:
|
|
- Long-tail keyword variations
|
|
- Question-based keywords (People Also Ask)
|
|
- Local keyword variations
|
|
- Intent-based keywords (commercial, informational, navigational)
|
|
- Seasonal keyword variants
|
|
|
|
Each keyword includes: search volume estimate, difficulty score, relevance, content opportunity
|
|
"""
|
|
start_time = datetime.utcnow()
|
|
|
|
try:
|
|
logger.info(f"Expanding keywords from {len(request.current_keywords)} base keywords")
|
|
|
|
llm_service = LLMInsightsService()
|
|
expansion = await llm_service.generate_keyword_expansion(
|
|
current_keywords=request.current_keywords,
|
|
content_analysis=request.content_analysis,
|
|
target_difficulty=request.target_difficulty
|
|
)
|
|
|
|
execution_time = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
log_data = {
|
|
"operation": "keyword_expansion",
|
|
"original_keywords": len(request.current_keywords),
|
|
"expanded_keywords": expansion.get('expanded_keywords', 0),
|
|
"execution_time": execution_time,
|
|
"success": True
|
|
}
|
|
background_tasks.add_task(save_to_file, f"{LOG_DIR}/llm_operations.jsonl", log_data)
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="Keyword expansion completed successfully",
|
|
execution_time=execution_time,
|
|
data=expansion
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Keyword expansion failed: {str(e)}", exc_info=True)
|
|
return await handle_seo_tool_exception("expand_keywords", e,
|
|
{"keywords_count": len(request.current_keywords)})
|
|
|
|
|
|
@router.get("/llm/health", response_model=BaseResponse)
|
|
@log_api_call
|
|
async def check_llm_insights_health() -> BaseResponse:
|
|
"""Health check for LLM insights service"""
|
|
try:
|
|
llm_service = LLMInsightsService()
|
|
health = await llm_service.health_check()
|
|
|
|
return BaseResponse(
|
|
success=True,
|
|
message="LLM insights service is healthy",
|
|
data={
|
|
"service": health.get('service'),
|
|
"version": health.get('version'),
|
|
"llm_integration": health.get('llm_integration'),
|
|
"timestamp": health.get('last_check')
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"LLM insights health check failed: {str(e)}")
|
|
return BaseResponse(
|
|
success=False,
|
|
message="LLM insights service health check failed",
|
|
data={"error": str(e)}
|
|
)
|