""" Step 4 Persona Generation Routes Handles AI writing persona generation using the sophisticated persona system. """ import asyncio from typing import Dict, Any, List, Optional, Union from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks from pydantic import BaseModel from loguru import logger import os # Rate limiting configuration RATE_LIMIT_DELAY_SECONDS = 2.0 # Delay between API calls to prevent quota exhaustion # Task management for long-running persona generation import uuid from datetime import datetime, timedelta from services.persona.core_persona.core_persona_service import CorePersonaService from services.persona.enhanced_linguistic_analyzer import EnhancedLinguisticAnalyzer from services.persona.persona_quality_improver import PersonaQualityImprover from middleware.auth_middleware import get_current_user from services.user_api_key_context import user_api_keys # In-memory task storage (in production, use Redis or database) persona_tasks: Dict[str, Dict[str, Any]] = {} # In-memory latest persona cache per user (24h TTL) persona_latest_cache: Dict[str, Dict[str, Any]] = {} PERSONA_CACHE_TTL_HOURS = 24 router = APIRouter() # Initialize services core_persona_service = CorePersonaService() linguistic_analyzer = EnhancedLinguisticAnalyzer() quality_improver = PersonaQualityImprover() def _extract_user_id(user: Dict[str, Any]) -> str: """Extract a stable user ID from Clerk-authenticated user payloads. Prefers 'clerk_user_id' or 'id', falls back to 'user_id', else 'unknown'. """ if not isinstance(user, dict): return 'unknown' return ( user.get('clerk_user_id') or user.get('id') or user.get('user_id') or 'unknown' ) class PersonaGenerationRequest(BaseModel): """Request model for persona generation.""" onboarding_data: Dict[str, Any] selected_platforms: List[str] = ["linkedin", "blog"] user_preferences: Optional[Dict[str, Any]] = None class PersonaGenerationResponse(BaseModel): """Response model for persona generation.""" success: bool core_persona: Optional[Dict[str, Any]] = None platform_personas: Optional[Dict[str, Any]] = None quality_metrics: Optional[Dict[str, Any]] = None error: Optional[str] = None class PersonaQualityRequest(BaseModel): """Request model for persona quality assessment.""" core_persona: Dict[str, Any] platform_personas: Dict[str, Any] user_feedback: Optional[Dict[str, Any]] = None class PersonaQualityResponse(BaseModel): """Response model for persona quality assessment.""" success: bool quality_metrics: Optional[Dict[str, Any]] = None recommendations: Optional[List[str]] = None error: Optional[str] = None class PersonaTaskStatus(BaseModel): """Response model for persona generation task status.""" task_id: str status: str # 'pending', 'running', 'completed', 'failed' progress: int # 0-100 current_step: str progress_messages: List[Dict[str, Any]] = [] result: Optional[Dict[str, Any]] = None error: Optional[str] = None created_at: str updated_at: str @router.post("/step4/generate-personas-async", response_model=Dict[str, str]) async def generate_writing_personas_async( request: Union[PersonaGenerationRequest, Dict[str, Any]], current_user: Dict[str, Any] = Depends(get_current_user), background_tasks: BackgroundTasks = BackgroundTasks() ): """ Start persona generation as an async task and return task ID for polling. """ try: # Handle both PersonaGenerationRequest and dict inputs if isinstance(request, dict): persona_request = PersonaGenerationRequest(**request) else: persona_request = request # If fresh cache exists for this user, short-circuit and return a completed task user_id = _extract_user_id(current_user) cached = persona_latest_cache.get(user_id) if cached: ts = datetime.fromisoformat(cached.get("timestamp", datetime.now().isoformat())) if isinstance(cached.get("timestamp"), str) else None if ts and (datetime.now() - ts) <= timedelta(hours=PERSONA_CACHE_TTL_HOURS): task_id = str(uuid.uuid4()) persona_tasks[task_id] = { "task_id": task_id, "status": "completed", "progress": 100, "current_step": "Persona loaded from cache", "progress_messages": [ {"timestamp": datetime.now().isoformat(), "message": "Loaded cached persona", "progress": 100} ], "result": { "success": True, "core_persona": cached.get("core_persona"), "platform_personas": cached.get("platform_personas", {}), "quality_metrics": cached.get("quality_metrics", {}), }, "error": None, "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat(), "user_id": user_id, "request_data": (PersonaGenerationRequest(**(request if isinstance(request, dict) else request.dict())).dict()) if request else {} } logger.info(f"Cache hit for user {user_id} - returning completed task without regeneration: {task_id}") return { "task_id": task_id, "status": "completed", "message": "Persona loaded from cache" } # Generate unique task ID task_id = str(uuid.uuid4()) # Initialize task status persona_tasks[task_id] = { "task_id": task_id, "status": "pending", "progress": 0, "current_step": "Initializing persona generation...", "progress_messages": [], "result": None, "error": None, "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat(), "user_id": user_id, "request_data": persona_request.dict() } # Start background task background_tasks.add_task( execute_persona_generation_task, task_id, persona_request, current_user ) logger.info(f"Started async persona generation task: {task_id}") logger.info(f"Background task added successfully for task: {task_id}") # Test: Add a simple background task to verify background task execution def test_simple_task(): logger.info(f"TEST: Simple background task executed for {task_id}") background_tasks.add_task(test_simple_task) logger.info(f"TEST: Simple background task added for {task_id}") return { "task_id": task_id, "status": "pending", "message": "Persona generation started. Use task_id to poll for progress." } except Exception as e: logger.error(f"Failed to start persona generation task: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to start task: {str(e)}") @router.get("/step4/persona-latest", response_model=Dict[str, Any]) async def get_latest_persona(current_user: Dict[str, Any] = Depends(get_current_user)): """Return latest cached persona for the current user if available and fresh.""" try: user_id = _extract_user_id(current_user) cached = persona_latest_cache.get(user_id) if not cached: raise HTTPException(status_code=404, detail="No cached persona found") ts = datetime.fromisoformat(cached["timestamp"]) if isinstance(cached.get("timestamp"), str) else None if not ts or (datetime.now() - ts) > timedelta(hours=PERSONA_CACHE_TTL_HOURS): # Expired persona_latest_cache.pop(user_id, None) raise HTTPException(status_code=404, detail="Cached persona expired") return {"success": True, "persona": cached} except HTTPException as he: # Return 200 even for HTTP exceptions (like 404) to prevent frontend connection errors # if the endpoint is called during an auto-initialization phase. logger.warning(f"Persona retrieval notice (returning success=False): {he.detail}") return { "success": False, "persona": None, "message": he.detail, "status_code": he.status_code } except Exception as e: logger.error(f"Error getting latest persona: {e}", exc_info=True) return { "success": False, "persona": None, "message": f"Internal error retrieving persona: {str(e)}", "status_code": 500 } @router.post("/step4/persona-save", response_model=Dict[str, Any]) async def save_persona_update( request: Dict[str, Any], current_user: Dict[str, Any] = Depends(get_current_user) ): """Save/overwrite latest persona cache for current user (from edited UI).""" try: user_id = _extract_user_id(current_user) payload = { "success": True, "core_persona": request.get("core_persona"), "platform_personas": request.get("platform_personas", {}), "quality_metrics": request.get("quality_metrics", {}), "selected_platforms": request.get("selected_platforms", []), "timestamp": datetime.now().isoformat() } persona_latest_cache[user_id] = payload logger.info(f"Saved latest persona to cache for user {user_id}") return {"success": True} except Exception as e: logger.error(f"Error saving latest persona: {e}", exc_info=True) return { "success": False, "message": f"Failed to save persona: {str(e)}", "status_code": 500 } @router.get("/step4/persona-task/{task_id}", response_model=PersonaTaskStatus) async def get_persona_task_status(task_id: str): """ Get the status of a persona generation task. """ if task_id not in persona_tasks: raise HTTPException(status_code=404, detail="Task not found") task = persona_tasks[task_id] # Clean up old tasks (older than 1 hour) if datetime.now() - datetime.fromisoformat(task["created_at"]) > timedelta(hours=1): del persona_tasks[task_id] raise HTTPException(status_code=404, detail="Task expired") return PersonaTaskStatus(**task) @router.post("/step4/generate-personas", response_model=PersonaGenerationResponse) async def generate_writing_personas( request: Union[PersonaGenerationRequest, Dict[str, Any]], current_user: Dict[str, Any] = Depends(get_current_user) ): """ Generate AI writing personas using the sophisticated persona system with optimized parallel execution. OPTIMIZED APPROACH: 1. Generate core persona (1 API call) 2. Parallel platform adaptations (1 API call per platform) 3. Parallel quality assessment (no additional API calls - uses existing data) Total API calls: 1 + N platforms (vs previous: 1 + N + 1 = N + 2) """ try: logger.info(f"Starting OPTIMIZED persona generation for user: {current_user.get('user_id', 'unknown')}") # Handle both PersonaGenerationRequest and dict inputs if isinstance(request, dict): # Convert dict to PersonaGenerationRequest persona_request = PersonaGenerationRequest(**request) else: persona_request = request logger.info(f"Selected platforms: {persona_request.selected_platforms}") # Step 1: Generate core persona (1 API call) logger.info("Step 1: Generating core persona...") core_persona = await asyncio.get_event_loop().run_in_executor( None, core_persona_service.generate_core_persona, persona_request.onboarding_data ) # Add small delay after core persona generation await asyncio.sleep(1.0) if "error" in core_persona: logger.error(f"Core persona generation failed: {core_persona['error']}") return PersonaGenerationResponse( success=False, error=f"Core persona generation failed: {core_persona['error']}" ) # Step 2: Generate platform adaptations with rate limiting (N API calls with delays) logger.info(f"Step 2: Generating platform adaptations with rate limiting for: {persona_request.selected_platforms}") platform_personas = {} # Process platforms sequentially with small delays to avoid rate limits for i, platform in enumerate(persona_request.selected_platforms): try: logger.info(f"Generating {platform} persona ({i+1}/{len(persona_request.selected_platforms)})") # Add delay between API calls to prevent rate limiting if i > 0: # Skip delay for first platform logger.info(f"Rate limiting: Waiting {RATE_LIMIT_DELAY_SECONDS}s before next API call...") await asyncio.sleep(RATE_LIMIT_DELAY_SECONDS) # Generate platform persona result = await generate_single_platform_persona_async( core_persona, platform, persona_request.onboarding_data ) if isinstance(result, Exception): error_msg = str(result) logger.error(f"Platform {platform} generation failed: {error_msg}") platform_personas[platform] = {"error": error_msg} elif "error" in result: error_msg = result['error'] logger.error(f"Platform {platform} generation failed: {error_msg}") platform_personas[platform] = result # Check for rate limit errors and suggest retry if "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower(): logger.warning(f"⚠️ Rate limit detected for {platform}. Consider increasing RATE_LIMIT_DELAY_SECONDS") else: platform_personas[platform] = result logger.info(f"✅ {platform} persona generated successfully") except Exception as e: logger.error(f"Platform {platform} generation error: {str(e)}") platform_personas[platform] = {"error": str(e)} # Step 3: Assess quality (no additional API calls - uses existing data) logger.info("Step 3: Assessing persona quality...") quality_metrics = await assess_persona_quality_internal( core_persona, platform_personas, persona_request.user_preferences ) # Log performance metrics total_platforms = len(persona_request.selected_platforms) successful_platforms = len([p for p in platform_personas.values() if "error" not in p]) logger.info(f"✅ Persona generation completed: {successful_platforms}/{total_platforms} platforms successful") logger.info(f"📊 API calls made: 1 (core) + {total_platforms} (platforms) = {1 + total_platforms} total") logger.info(f"⏱️ Rate limiting: Sequential processing with 2s delays to prevent quota exhaustion") return PersonaGenerationResponse( success=True, core_persona=core_persona, platform_personas=platform_personas, quality_metrics=quality_metrics ) except Exception as e: logger.error(f"Persona generation error: {str(e)}") return PersonaGenerationResponse( success=False, error=f"Persona generation failed: {str(e)}" ) @router.post("/step4/assess-quality", response_model=PersonaQualityResponse) async def assess_persona_quality( request: Union[PersonaQualityRequest, Dict[str, Any]], current_user: Dict[str, Any] = Depends(get_current_user) ): """ Assess the quality of generated personas and provide improvement recommendations. """ try: logger.info(f"Assessing persona quality for user: {current_user.get('user_id', 'unknown')}") # Handle both PersonaQualityRequest and dict inputs if isinstance(request, dict): # Convert dict to PersonaQualityRequest quality_request = PersonaQualityRequest(**request) else: quality_request = request quality_metrics = await assess_persona_quality_internal( quality_request.core_persona, quality_request.platform_personas, quality_request.user_feedback ) return PersonaQualityResponse( success=True, quality_metrics=quality_metrics, recommendations=quality_metrics.get('recommendations', []) ) except Exception as e: logger.error(f"Quality assessment error: {str(e)}") return PersonaQualityResponse( success=False, error=f"Quality assessment failed: {str(e)}" ) @router.post("/step4/regenerate-persona") async def regenerate_persona( request: Union[PersonaGenerationRequest, Dict[str, Any]], current_user: Dict[str, Any] = Depends(get_current_user) ): """ Regenerate persona with different parameters or improved analysis. """ try: logger.info(f"Regenerating persona for user: {current_user.get('user_id', 'unknown')}") # Use the same generation logic but with potentially different parameters return await generate_writing_personas(request, current_user) except Exception as e: logger.error(f"Persona regeneration error: {str(e)}") return PersonaGenerationResponse( success=False, error=f"Persona regeneration failed: {str(e)}" ) @router.post("/step4/test-background-task") async def test_background_task( background_tasks: BackgroundTasks = BackgroundTasks() ): """Test endpoint to verify background task execution.""" def simple_background_task(): logger.info("BACKGROUND TASK EXECUTED SUCCESSFULLY!") return "Task completed" background_tasks.add_task(simple_background_task) logger.info("Background task added to queue") return {"message": "Background task added", "status": "success"} @router.get("/step4/persona-options") async def get_persona_generation_options( current_user: Dict[str, Any] = Depends(get_current_user) ): """ Get available options for persona generation (platforms, preferences, etc.). """ try: return { "success": True, "available_platforms": [ {"id": "linkedin", "name": "LinkedIn", "description": "Professional networking and thought leadership"}, {"id": "facebook", "name": "Facebook", "description": "Social media and community building"}, {"id": "twitter", "name": "Twitter", "description": "Micro-blogging and real-time updates"}, {"id": "blog", "name": "Blog", "description": "Long-form content and SEO optimization"}, {"id": "instagram", "name": "Instagram", "description": "Visual storytelling and engagement"}, {"id": "medium", "name": "Medium", "description": "Publishing platform and audience building"}, {"id": "substack", "name": "Substack", "description": "Newsletter and subscription content"} ], "persona_types": [ "Thought Leader", "Industry Expert", "Content Creator", "Brand Ambassador", "Community Builder" ], "quality_metrics": [ "Style Consistency", "Brand Alignment", "Platform Optimization", "Engagement Potential", "Content Quality" ] } except Exception as e: logger.error(f"Error getting persona options: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to get persona options: {str(e)}") async def execute_persona_generation_task(task_id: str, persona_request: PersonaGenerationRequest, current_user: Dict[str, Any]): """ Execute persona generation task in background with progress updates. """ try: logger.info(f"BACKGROUND TASK STARTED: {task_id}") logger.info(f"Task {task_id}: Background task execution initiated") # Log onboarding data summary for debugging onboarding_data_summary = { "has_websiteAnalysis": bool(persona_request.onboarding_data.get("websiteAnalysis")), "has_competitorResearch": bool(persona_request.onboarding_data.get("competitorResearch")), "has_sitemapAnalysis": bool(persona_request.onboarding_data.get("sitemapAnalysis")), "has_businessData": bool(persona_request.onboarding_data.get("businessData")), "data_keys": list(persona_request.onboarding_data.keys()) if persona_request.onboarding_data else [] } logger.info(f"Task {task_id}: Onboarding data summary: {onboarding_data_summary}") # Update task status to running update_task_status(task_id, "running", 10, "Starting persona generation...") logger.info(f"Task {task_id}: Status updated to running") # Inject user-specific API keys into environment for the duration of this background task user_id = _extract_user_id(current_user) env_mapping = { 'gemini': 'GEMINI_API_KEY', 'exa': 'EXA_API_KEY', 'openai': 'OPENAI_API_KEY', 'anthropic': 'ANTHROPIC_API_KEY', 'mistral': 'MISTRAL_API_KEY', 'copilotkit': 'COPILOTKIT_API_KEY', 'tavily': 'TAVILY_API_KEY', 'serper': 'SERPER_API_KEY', 'firecrawl': 'FIRECRAWL_API_KEY', } original_env: Dict[str, Optional[str]] = {} with user_api_keys(user_id) as keys: try: for provider, env_var in env_mapping.items(): value = keys.get(provider) if value: original_env[env_var] = os.environ.get(env_var) os.environ[env_var] = value logger.debug(f"[BG TASK] Injected {env_var} for user {user_id}") # Step 1: Generate core persona (1 API call) update_task_status(task_id, "running", 20, "Generating core persona...") logger.info(f"Task {task_id}: Step 1 - Generating core persona...") core_persona = await asyncio.get_event_loop().run_in_executor( None, core_persona_service.generate_core_persona, persona_request.onboarding_data ) if "error" in core_persona: error_msg = core_persona['error'] # Check if this is a quota/rate limit error if "RESOURCE_EXHAUSTED" in str(error_msg) or "429" in str(error_msg) or "quota" in str(error_msg).lower(): update_task_status(task_id, "failed", 0, f"Quota exhausted: {error_msg}", error=str(error_msg)) logger.error(f"Task {task_id}: Quota exhausted, marking as failed immediately") else: update_task_status(task_id, "failed", 0, f"Core persona generation failed: {error_msg}", error=str(error_msg)) return update_task_status(task_id, "running", 40, "Core persona generated successfully") # Add small delay after core persona generation await asyncio.sleep(1.0) # Step 2: Generate platform adaptations with rate limiting (N API calls with delays) update_task_status(task_id, "running", 50, f"Generating platform adaptations for: {persona_request.selected_platforms}") platform_personas = {} total_platforms = len(persona_request.selected_platforms) # Process platforms sequentially with small delays to avoid rate limits for i, platform in enumerate(persona_request.selected_platforms): try: progress = 50 + (i * 40 // total_platforms) update_task_status(task_id, "running", progress, f"Generating {platform} persona ({i+1}/{total_platforms})") # Add delay between API calls to prevent rate limiting if i > 0: # Skip delay for first platform update_task_status(task_id, "running", progress, f"Rate limiting: Waiting {RATE_LIMIT_DELAY_SECONDS}s before next API call...") await asyncio.sleep(RATE_LIMIT_DELAY_SECONDS) # Generate platform persona result = await generate_single_platform_persona_async( core_persona, platform, persona_request.onboarding_data ) if isinstance(result, Exception): error_msg = str(result) logger.error(f"Platform {platform} generation failed: {error_msg}") platform_personas[platform] = {"error": error_msg} elif "error" in result: error_msg = result['error'] logger.error(f"Platform {platform} generation failed: {error_msg}") platform_personas[platform] = result # Check for rate limit errors and suggest retry if "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower(): logger.warning(f"⚠️ Rate limit detected for {platform}. Consider increasing RATE_LIMIT_DELAY_SECONDS") else: platform_personas[platform] = result logger.info(f"✅ {platform} persona generated successfully") except Exception as e: logger.error(f"Platform {platform} generation error: {str(e)}") platform_personas[platform] = {"error": str(e)} # Step 3: Assess quality (no additional API calls - uses existing data) update_task_status(task_id, "running", 90, "Assessing persona quality...") quality_metrics = await assess_persona_quality_internal( core_persona, platform_personas, persona_request.user_preferences ) finally: # Restore environment for env_var, original_value in original_env.items(): if original_value is None: os.environ.pop(env_var, None) else: os.environ[env_var] = original_value logger.debug(f"[BG TASK] Restored environment for user {user_id}") # Log performance metrics successful_platforms = len([p for p in platform_personas.values() if "error" not in p]) logger.info(f"✅ Persona generation completed: {successful_platforms}/{total_platforms} platforms successful") logger.info(f"📊 API calls made: 1 (core) + {total_platforms} (platforms) = {1 + total_platforms} total") logger.info(f"⏱️ Rate limiting: Sequential processing with 2s delays to prevent quota exhaustion") # Create final result final_result = { "success": True, "core_persona": core_persona, "platform_personas": platform_personas, "quality_metrics": quality_metrics } # Update task status to completed update_task_status(task_id, "completed", 100, "Persona generation completed successfully", final_result) # Populate server-side cache for quick reloads try: user_id = _extract_user_id(current_user) persona_latest_cache[user_id] = { **final_result, "selected_platforms": persona_request.selected_platforms, "timestamp": datetime.now().isoformat() } logger.info(f"Latest persona cached for user {user_id}") except Exception as e: logger.warning(f"Could not cache latest persona: {e}") except Exception as e: logger.error(f"Persona generation task {task_id} failed: {str(e)}") logger.error(f"Task {task_id}: Exception details: {type(e).__name__}: {str(e)}") import traceback logger.error(f"Task {task_id}: Full traceback: {traceback.format_exc()}") update_task_status(task_id, "failed", 0, f"Persona generation failed: {str(e)}") def update_task_status(task_id: str, status: str, progress: int, current_step: str, result: Optional[Dict[str, Any]] = None, error: Optional[str] = None): """Update task status in memory storage.""" if task_id in persona_tasks: persona_tasks[task_id].update({ "status": status, "progress": progress, "current_step": current_step, "updated_at": datetime.now().isoformat(), "result": result, "error": error }) # Add progress message persona_tasks[task_id]["progress_messages"].append({ "timestamp": datetime.now().isoformat(), "message": current_step, "progress": progress }) async def generate_single_platform_persona_async( core_persona: Dict[str, Any], platform: str, onboarding_data: Dict[str, Any] ) -> Dict[str, Any]: """ Async wrapper for single platform persona generation. """ try: return await asyncio.get_event_loop().run_in_executor( None, core_persona_service._generate_single_platform_persona, core_persona, platform, onboarding_data ) except Exception as e: logger.error(f"Error generating {platform} persona: {str(e)}") return {"error": f"Failed to generate {platform} persona: {str(e)}"} async def assess_persona_quality_internal( core_persona: Dict[str, Any], platform_personas: Dict[str, Any], user_preferences: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Internal function to assess persona quality using comprehensive metrics. """ try: from services.persona.persona_quality_improver import PersonaQualityImprover # Initialize quality improver quality_improver = PersonaQualityImprover() # Use mock linguistic analysis if not available linguistic_analysis = { "analysis_completeness": 0.85, "style_consistency": 0.88, "vocabulary_sophistication": 0.82, "content_coherence": 0.87 } # Get comprehensive quality metrics quality_metrics = quality_improver.assess_persona_quality_comprehensive( core_persona, platform_personas, linguistic_analysis, user_preferences ) return quality_metrics except Exception as e: logger.error(f"Quality assessment internal error: {str(e)}") # Return fallback quality metrics compatible with PersonaQualityImprover schema return { "overall_score": 75, "core_completeness": 75, "platform_consistency": 75, "platform_optimization": 75, "linguistic_quality": 75, "recommendations": ["Quality assessment completed with default metrics"], "weights": { "core_completeness": 0.30, "platform_consistency": 0.25, "platform_optimization": 0.25, "linguistic_quality": 0.20 }, "error": str(e) } async def _log_persona_generation_result( user_id: str, core_persona: Dict[str, Any], platform_personas: Dict[str, Any], quality_metrics: Dict[str, Any] ): """Background task to log persona generation results.""" try: logger.info(f"Logging persona generation result for user {user_id}") logger.info(f"Core persona generated with {len(core_persona)} characteristics") logger.info(f"Platform personas generated for {len(platform_personas)} platforms") logger.info(f"Quality metrics: {quality_metrics.get('overall_score', 'N/A')}% overall score") except Exception as e: logger.error(f"Error logging persona generation result: {str(e)}")