Files
ALwrity/backend/api/onboarding_utils/step4_persona_routes.py

765 lines
33 KiB
Python

"""
Step 4 Persona Generation Routes
Handles AI writing persona generation using the sophisticated persona system.
"""
import asyncio
from typing import Dict, Any, List, Optional, Union
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from pydantic import BaseModel
from loguru import logger
import os
# Rate limiting configuration
RATE_LIMIT_DELAY_SECONDS = 2.0 # Delay between API calls to prevent quota exhaustion
# Task management for long-running persona generation
import uuid
from datetime import datetime, timedelta
from services.persona.core_persona.core_persona_service import CorePersonaService
from services.persona.enhanced_linguistic_analyzer import EnhancedLinguisticAnalyzer
from services.persona.persona_quality_improver import PersonaQualityImprover
from middleware.auth_middleware import get_current_user
from services.user_api_key_context import user_api_keys
# In-memory task storage (in production, use Redis or database)
persona_tasks: Dict[str, Dict[str, Any]] = {}
# In-memory latest persona cache per user (24h TTL)
persona_latest_cache: Dict[str, Dict[str, Any]] = {}
PERSONA_CACHE_TTL_HOURS = 24
router = APIRouter()
# Initialize services
core_persona_service = CorePersonaService()
linguistic_analyzer = EnhancedLinguisticAnalyzer()
quality_improver = PersonaQualityImprover()
def _extract_user_id(user: Dict[str, Any]) -> str:
"""Extract a stable user ID from Clerk-authenticated user payloads.
Prefers 'clerk_user_id' or 'id', falls back to 'user_id', else 'unknown'.
"""
if not isinstance(user, dict):
return 'unknown'
return (
user.get('clerk_user_id')
or user.get('id')
or user.get('user_id')
or 'unknown'
)
class PersonaGenerationRequest(BaseModel):
"""Request model for persona generation."""
onboarding_data: Dict[str, Any]
selected_platforms: List[str] = ["linkedin", "blog"]
user_preferences: Optional[Dict[str, Any]] = None
class PersonaGenerationResponse(BaseModel):
"""Response model for persona generation."""
success: bool
core_persona: Optional[Dict[str, Any]] = None
platform_personas: Optional[Dict[str, Any]] = None
quality_metrics: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class PersonaQualityRequest(BaseModel):
"""Request model for persona quality assessment."""
core_persona: Dict[str, Any]
platform_personas: Dict[str, Any]
user_feedback: Optional[Dict[str, Any]] = None
class PersonaQualityResponse(BaseModel):
"""Response model for persona quality assessment."""
success: bool
quality_metrics: Optional[Dict[str, Any]] = None
recommendations: Optional[List[str]] = None
error: Optional[str] = None
class PersonaTaskStatus(BaseModel):
"""Response model for persona generation task status."""
task_id: str
status: str # 'pending', 'running', 'completed', 'failed'
progress: int # 0-100
current_step: str
progress_messages: List[Dict[str, Any]] = []
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
created_at: str
updated_at: str
@router.post("/step4/generate-personas-async", response_model=Dict[str, str])
async def generate_writing_personas_async(
request: Union[PersonaGenerationRequest, Dict[str, Any]],
current_user: Dict[str, Any] = Depends(get_current_user),
background_tasks: BackgroundTasks = BackgroundTasks()
):
"""
Start persona generation as an async task and return task ID for polling.
"""
try:
# Handle both PersonaGenerationRequest and dict inputs
if isinstance(request, dict):
persona_request = PersonaGenerationRequest(**request)
else:
persona_request = request
# If fresh cache exists for this user, short-circuit and return a completed task
user_id = _extract_user_id(current_user)
cached = persona_latest_cache.get(user_id)
if cached:
ts = datetime.fromisoformat(cached.get("timestamp", datetime.now().isoformat())) if isinstance(cached.get("timestamp"), str) else None
if ts and (datetime.now() - ts) <= timedelta(hours=PERSONA_CACHE_TTL_HOURS):
task_id = str(uuid.uuid4())
persona_tasks[task_id] = {
"task_id": task_id,
"status": "completed",
"progress": 100,
"current_step": "Persona loaded from cache",
"progress_messages": [
{"timestamp": datetime.now().isoformat(), "message": "Loaded cached persona", "progress": 100}
],
"result": {
"success": True,
"core_persona": cached.get("core_persona"),
"platform_personas": cached.get("platform_personas", {}),
"quality_metrics": cached.get("quality_metrics", {}),
},
"error": None,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"user_id": user_id,
"request_data": (PersonaGenerationRequest(**(request if isinstance(request, dict) else request.dict())).dict()) if request else {}
}
logger.info(f"Cache hit for user {user_id} - returning completed task without regeneration: {task_id}")
return {
"task_id": task_id,
"status": "completed",
"message": "Persona loaded from cache"
}
# Generate unique task ID
task_id = str(uuid.uuid4())
# Initialize task status
persona_tasks[task_id] = {
"task_id": task_id,
"status": "pending",
"progress": 0,
"current_step": "Initializing persona generation...",
"progress_messages": [],
"result": None,
"error": None,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"user_id": user_id,
"request_data": persona_request.dict()
}
# Start background task
background_tasks.add_task(
execute_persona_generation_task,
task_id,
persona_request,
current_user
)
logger.info(f"Started async persona generation task: {task_id}")
logger.info(f"Background task added successfully for task: {task_id}")
# Test: Add a simple background task to verify background task execution
def test_simple_task():
logger.info(f"TEST: Simple background task executed for {task_id}")
background_tasks.add_task(test_simple_task)
logger.info(f"TEST: Simple background task added for {task_id}")
return {
"task_id": task_id,
"status": "pending",
"message": "Persona generation started. Use task_id to poll for progress."
}
except Exception as e:
logger.error(f"Failed to start persona generation task: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to start task: {str(e)}")
@router.get("/step4/persona-latest", response_model=Dict[str, Any])
async def get_latest_persona(current_user: Dict[str, Any] = Depends(get_current_user)):
"""Return latest cached persona for the current user if available and fresh."""
try:
user_id = _extract_user_id(current_user)
cached = persona_latest_cache.get(user_id)
if not cached:
raise HTTPException(status_code=404, detail="No cached persona found")
ts = datetime.fromisoformat(cached["timestamp"]) if isinstance(cached.get("timestamp"), str) else None
if not ts or (datetime.now() - ts) > timedelta(hours=PERSONA_CACHE_TTL_HOURS):
# Expired
persona_latest_cache.pop(user_id, None)
raise HTTPException(status_code=404, detail="Cached persona expired")
return {"success": True, "persona": cached}
except HTTPException as he:
# Return 200 even for HTTP exceptions (like 404) to prevent frontend connection errors
# if the endpoint is called during an auto-initialization phase.
logger.warning(f"Persona retrieval notice (returning success=False): {he.detail}")
return {
"success": False,
"persona": None,
"message": he.detail,
"status_code": he.status_code
}
except Exception as e:
logger.error(f"Error getting latest persona: {e}", exc_info=True)
return {
"success": False,
"persona": None,
"message": f"Internal error retrieving persona: {str(e)}",
"status_code": 500
}
@router.post("/step4/persona-save", response_model=Dict[str, Any])
async def save_persona_update(
request: Dict[str, Any],
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""Save/overwrite latest persona cache for current user (from edited UI)."""
try:
user_id = _extract_user_id(current_user)
payload = {
"success": True,
"core_persona": request.get("core_persona"),
"platform_personas": request.get("platform_personas", {}),
"quality_metrics": request.get("quality_metrics", {}),
"selected_platforms": request.get("selected_platforms", []),
"timestamp": datetime.now().isoformat()
}
persona_latest_cache[user_id] = payload
logger.info(f"Saved latest persona to cache for user {user_id}")
return {"success": True}
except Exception as e:
logger.error(f"Error saving latest persona: {e}", exc_info=True)
return {
"success": False,
"message": f"Failed to save persona: {str(e)}",
"status_code": 500
}
@router.get("/step4/persona-task/{task_id}", response_model=PersonaTaskStatus)
async def get_persona_task_status(task_id: str):
"""
Get the status of a persona generation task.
"""
if task_id not in persona_tasks:
raise HTTPException(status_code=404, detail="Task not found")
task = persona_tasks[task_id]
# Clean up old tasks (older than 1 hour)
if datetime.now() - datetime.fromisoformat(task["created_at"]) > timedelta(hours=1):
del persona_tasks[task_id]
raise HTTPException(status_code=404, detail="Task expired")
return PersonaTaskStatus(**task)
@router.post("/step4/generate-personas", response_model=PersonaGenerationResponse)
async def generate_writing_personas(
request: Union[PersonaGenerationRequest, Dict[str, Any]],
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Generate AI writing personas using the sophisticated persona system with optimized parallel execution.
OPTIMIZED APPROACH:
1. Generate core persona (1 API call)
2. Parallel platform adaptations (1 API call per platform)
3. Parallel quality assessment (no additional API calls - uses existing data)
Total API calls: 1 + N platforms (vs previous: 1 + N + 1 = N + 2)
"""
try:
logger.info(f"Starting OPTIMIZED persona generation for user: {current_user.get('user_id', 'unknown')}")
# Handle both PersonaGenerationRequest and dict inputs
if isinstance(request, dict):
# Convert dict to PersonaGenerationRequest
persona_request = PersonaGenerationRequest(**request)
else:
persona_request = request
logger.info(f"Selected platforms: {persona_request.selected_platforms}")
# Step 1: Generate core persona (1 API call)
logger.info("Step 1: Generating core persona...")
core_persona = await asyncio.get_event_loop().run_in_executor(
None,
core_persona_service.generate_core_persona,
persona_request.onboarding_data
)
# Add small delay after core persona generation
await asyncio.sleep(1.0)
if "error" in core_persona:
logger.error(f"Core persona generation failed: {core_persona['error']}")
return PersonaGenerationResponse(
success=False,
error=f"Core persona generation failed: {core_persona['error']}"
)
# Step 2: Generate platform adaptations with rate limiting (N API calls with delays)
logger.info(f"Step 2: Generating platform adaptations with rate limiting for: {persona_request.selected_platforms}")
platform_personas = {}
# Process platforms sequentially with small delays to avoid rate limits
for i, platform in enumerate(persona_request.selected_platforms):
try:
logger.info(f"Generating {platform} persona ({i+1}/{len(persona_request.selected_platforms)})")
# Add delay between API calls to prevent rate limiting
if i > 0: # Skip delay for first platform
logger.info(f"Rate limiting: Waiting {RATE_LIMIT_DELAY_SECONDS}s before next API call...")
await asyncio.sleep(RATE_LIMIT_DELAY_SECONDS)
# Generate platform persona
result = await generate_single_platform_persona_async(
core_persona,
platform,
persona_request.onboarding_data
)
if isinstance(result, Exception):
error_msg = str(result)
logger.error(f"Platform {platform} generation failed: {error_msg}")
platform_personas[platform] = {"error": error_msg}
elif "error" in result:
error_msg = result['error']
logger.error(f"Platform {platform} generation failed: {error_msg}")
platform_personas[platform] = result
# Check for rate limit errors and suggest retry
if "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower():
logger.warning(f"⚠️ Rate limit detected for {platform}. Consider increasing RATE_LIMIT_DELAY_SECONDS")
else:
platform_personas[platform] = result
logger.info(f"{platform} persona generated successfully")
except Exception as e:
logger.error(f"Platform {platform} generation error: {str(e)}")
platform_personas[platform] = {"error": str(e)}
# Step 3: Assess quality (no additional API calls - uses existing data)
logger.info("Step 3: Assessing persona quality...")
quality_metrics = await assess_persona_quality_internal(
core_persona,
platform_personas,
persona_request.user_preferences
)
# Log performance metrics
total_platforms = len(persona_request.selected_platforms)
successful_platforms = len([p for p in platform_personas.values() if "error" not in p])
logger.info(f"✅ Persona generation completed: {successful_platforms}/{total_platforms} platforms successful")
logger.info(f"📊 API calls made: 1 (core) + {total_platforms} (platforms) = {1 + total_platforms} total")
logger.info(f"⏱️ Rate limiting: Sequential processing with 2s delays to prevent quota exhaustion")
return PersonaGenerationResponse(
success=True,
core_persona=core_persona,
platform_personas=platform_personas,
quality_metrics=quality_metrics
)
except Exception as e:
logger.error(f"Persona generation error: {str(e)}")
return PersonaGenerationResponse(
success=False,
error=f"Persona generation failed: {str(e)}"
)
@router.post("/step4/assess-quality", response_model=PersonaQualityResponse)
async def assess_persona_quality(
request: Union[PersonaQualityRequest, Dict[str, Any]],
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Assess the quality of generated personas and provide improvement recommendations.
"""
try:
logger.info(f"Assessing persona quality for user: {current_user.get('user_id', 'unknown')}")
# Handle both PersonaQualityRequest and dict inputs
if isinstance(request, dict):
# Convert dict to PersonaQualityRequest
quality_request = PersonaQualityRequest(**request)
else:
quality_request = request
quality_metrics = await assess_persona_quality_internal(
quality_request.core_persona,
quality_request.platform_personas,
quality_request.user_feedback
)
return PersonaQualityResponse(
success=True,
quality_metrics=quality_metrics,
recommendations=quality_metrics.get('recommendations', [])
)
except Exception as e:
logger.error(f"Quality assessment error: {str(e)}")
return PersonaQualityResponse(
success=False,
error=f"Quality assessment failed: {str(e)}"
)
@router.post("/step4/regenerate-persona")
async def regenerate_persona(
request: Union[PersonaGenerationRequest, Dict[str, Any]],
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Regenerate persona with different parameters or improved analysis.
"""
try:
logger.info(f"Regenerating persona for user: {current_user.get('user_id', 'unknown')}")
# Use the same generation logic but with potentially different parameters
return await generate_writing_personas(request, current_user)
except Exception as e:
logger.error(f"Persona regeneration error: {str(e)}")
return PersonaGenerationResponse(
success=False,
error=f"Persona regeneration failed: {str(e)}"
)
@router.post("/step4/test-background-task")
async def test_background_task(
background_tasks: BackgroundTasks = BackgroundTasks()
):
"""Test endpoint to verify background task execution."""
def simple_background_task():
logger.info("BACKGROUND TASK EXECUTED SUCCESSFULLY!")
return "Task completed"
background_tasks.add_task(simple_background_task)
logger.info("Background task added to queue")
return {"message": "Background task added", "status": "success"}
@router.get("/step4/persona-options")
async def get_persona_generation_options(
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Get available options for persona generation (platforms, preferences, etc.).
"""
try:
return {
"success": True,
"available_platforms": [
{"id": "linkedin", "name": "LinkedIn", "description": "Professional networking and thought leadership"},
{"id": "facebook", "name": "Facebook", "description": "Social media and community building"},
{"id": "twitter", "name": "Twitter", "description": "Micro-blogging and real-time updates"},
{"id": "blog", "name": "Blog", "description": "Long-form content and SEO optimization"},
{"id": "instagram", "name": "Instagram", "description": "Visual storytelling and engagement"},
{"id": "medium", "name": "Medium", "description": "Publishing platform and audience building"},
{"id": "substack", "name": "Substack", "description": "Newsletter and subscription content"}
],
"persona_types": [
"Thought Leader",
"Industry Expert",
"Content Creator",
"Brand Ambassador",
"Community Builder"
],
"quality_metrics": [
"Style Consistency",
"Brand Alignment",
"Platform Optimization",
"Engagement Potential",
"Content Quality"
]
}
except Exception as e:
logger.error(f"Error getting persona options: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get persona options: {str(e)}")
async def execute_persona_generation_task(task_id: str, persona_request: PersonaGenerationRequest, current_user: Dict[str, Any]):
"""
Execute persona generation task in background with progress updates.
"""
try:
logger.info(f"BACKGROUND TASK STARTED: {task_id}")
logger.info(f"Task {task_id}: Background task execution initiated")
# Log onboarding data summary for debugging
onboarding_data_summary = {
"has_websiteAnalysis": bool(persona_request.onboarding_data.get("websiteAnalysis")),
"has_competitorResearch": bool(persona_request.onboarding_data.get("competitorResearch")),
"has_sitemapAnalysis": bool(persona_request.onboarding_data.get("sitemapAnalysis")),
"has_businessData": bool(persona_request.onboarding_data.get("businessData")),
"data_keys": list(persona_request.onboarding_data.keys()) if persona_request.onboarding_data else []
}
logger.info(f"Task {task_id}: Onboarding data summary: {onboarding_data_summary}")
# Update task status to running
update_task_status(task_id, "running", 10, "Starting persona generation...")
logger.info(f"Task {task_id}: Status updated to running")
# Inject user-specific API keys into environment for the duration of this background task
user_id = _extract_user_id(current_user)
env_mapping = {
'gemini': 'GEMINI_API_KEY',
'exa': 'EXA_API_KEY',
'openai': 'OPENAI_API_KEY',
'anthropic': 'ANTHROPIC_API_KEY',
'mistral': 'MISTRAL_API_KEY',
'copilotkit': 'COPILOTKIT_API_KEY',
'tavily': 'TAVILY_API_KEY',
'serper': 'SERPER_API_KEY',
'firecrawl': 'FIRECRAWL_API_KEY',
}
original_env: Dict[str, Optional[str]] = {}
with user_api_keys(user_id) as keys:
try:
for provider, env_var in env_mapping.items():
value = keys.get(provider)
if value:
original_env[env_var] = os.environ.get(env_var)
os.environ[env_var] = value
logger.debug(f"[BG TASK] Injected {env_var} for user {user_id}")
# Step 1: Generate core persona (1 API call)
update_task_status(task_id, "running", 20, "Generating core persona...")
logger.info(f"Task {task_id}: Step 1 - Generating core persona...")
core_persona = await asyncio.get_event_loop().run_in_executor(
None,
core_persona_service.generate_core_persona,
persona_request.onboarding_data
)
if "error" in core_persona:
error_msg = core_persona['error']
# Check if this is a quota/rate limit error
if "RESOURCE_EXHAUSTED" in str(error_msg) or "429" in str(error_msg) or "quota" in str(error_msg).lower():
update_task_status(task_id, "failed", 0, f"Quota exhausted: {error_msg}", error=str(error_msg))
logger.error(f"Task {task_id}: Quota exhausted, marking as failed immediately")
else:
update_task_status(task_id, "failed", 0, f"Core persona generation failed: {error_msg}", error=str(error_msg))
return
update_task_status(task_id, "running", 40, "Core persona generated successfully")
# Add small delay after core persona generation
await asyncio.sleep(1.0)
# Step 2: Generate platform adaptations with rate limiting (N API calls with delays)
update_task_status(task_id, "running", 50, f"Generating platform adaptations for: {persona_request.selected_platforms}")
platform_personas = {}
total_platforms = len(persona_request.selected_platforms)
# Process platforms sequentially with small delays to avoid rate limits
for i, platform in enumerate(persona_request.selected_platforms):
try:
progress = 50 + (i * 40 // total_platforms)
update_task_status(task_id, "running", progress, f"Generating {platform} persona ({i+1}/{total_platforms})")
# Add delay between API calls to prevent rate limiting
if i > 0: # Skip delay for first platform
update_task_status(task_id, "running", progress, f"Rate limiting: Waiting {RATE_LIMIT_DELAY_SECONDS}s before next API call...")
await asyncio.sleep(RATE_LIMIT_DELAY_SECONDS)
# Generate platform persona
result = await generate_single_platform_persona_async(
core_persona,
platform,
persona_request.onboarding_data
)
if isinstance(result, Exception):
error_msg = str(result)
logger.error(f"Platform {platform} generation failed: {error_msg}")
platform_personas[platform] = {"error": error_msg}
elif "error" in result:
error_msg = result['error']
logger.error(f"Platform {platform} generation failed: {error_msg}")
platform_personas[platform] = result
# Check for rate limit errors and suggest retry
if "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower():
logger.warning(f"⚠️ Rate limit detected for {platform}. Consider increasing RATE_LIMIT_DELAY_SECONDS")
else:
platform_personas[platform] = result
logger.info(f"{platform} persona generated successfully")
except Exception as e:
logger.error(f"Platform {platform} generation error: {str(e)}")
platform_personas[platform] = {"error": str(e)}
# Step 3: Assess quality (no additional API calls - uses existing data)
update_task_status(task_id, "running", 90, "Assessing persona quality...")
quality_metrics = await assess_persona_quality_internal(
core_persona,
platform_personas,
persona_request.user_preferences
)
finally:
# Restore environment
for env_var, original_value in original_env.items():
if original_value is None:
os.environ.pop(env_var, None)
else:
os.environ[env_var] = original_value
logger.debug(f"[BG TASK] Restored environment for user {user_id}")
# Log performance metrics
successful_platforms = len([p for p in platform_personas.values() if "error" not in p])
logger.info(f"✅ Persona generation completed: {successful_platforms}/{total_platforms} platforms successful")
logger.info(f"📊 API calls made: 1 (core) + {total_platforms} (platforms) = {1 + total_platforms} total")
logger.info(f"⏱️ Rate limiting: Sequential processing with 2s delays to prevent quota exhaustion")
# Create final result
final_result = {
"success": True,
"core_persona": core_persona,
"platform_personas": platform_personas,
"quality_metrics": quality_metrics
}
# Update task status to completed
update_task_status(task_id, "completed", 100, "Persona generation completed successfully", final_result)
# Populate server-side cache for quick reloads
try:
user_id = _extract_user_id(current_user)
persona_latest_cache[user_id] = {
**final_result,
"selected_platforms": persona_request.selected_platforms,
"timestamp": datetime.now().isoformat()
}
logger.info(f"Latest persona cached for user {user_id}")
except Exception as e:
logger.warning(f"Could not cache latest persona: {e}")
except Exception as e:
logger.error(f"Persona generation task {task_id} failed: {str(e)}")
logger.error(f"Task {task_id}: Exception details: {type(e).__name__}: {str(e)}")
import traceback
logger.error(f"Task {task_id}: Full traceback: {traceback.format_exc()}")
update_task_status(task_id, "failed", 0, f"Persona generation failed: {str(e)}")
def update_task_status(task_id: str, status: str, progress: int, current_step: str, result: Optional[Dict[str, Any]] = None, error: Optional[str] = None):
"""Update task status in memory storage."""
if task_id in persona_tasks:
persona_tasks[task_id].update({
"status": status,
"progress": progress,
"current_step": current_step,
"updated_at": datetime.now().isoformat(),
"result": result,
"error": error
})
# Add progress message
persona_tasks[task_id]["progress_messages"].append({
"timestamp": datetime.now().isoformat(),
"message": current_step,
"progress": progress
})
async def generate_single_platform_persona_async(
core_persona: Dict[str, Any],
platform: str,
onboarding_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Async wrapper for single platform persona generation.
"""
try:
return await asyncio.get_event_loop().run_in_executor(
None,
core_persona_service._generate_single_platform_persona,
core_persona,
platform,
onboarding_data
)
except Exception as e:
logger.error(f"Error generating {platform} persona: {str(e)}")
return {"error": f"Failed to generate {platform} persona: {str(e)}"}
async def assess_persona_quality_internal(
core_persona: Dict[str, Any],
platform_personas: Dict[str, Any],
user_preferences: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Internal function to assess persona quality using comprehensive metrics.
"""
try:
from services.persona.persona_quality_improver import PersonaQualityImprover
# Initialize quality improver
quality_improver = PersonaQualityImprover()
# Use mock linguistic analysis if not available
linguistic_analysis = {
"analysis_completeness": 0.85,
"style_consistency": 0.88,
"vocabulary_sophistication": 0.82,
"content_coherence": 0.87
}
# Get comprehensive quality metrics
quality_metrics = quality_improver.assess_persona_quality_comprehensive(
core_persona,
platform_personas,
linguistic_analysis,
user_preferences
)
return quality_metrics
except Exception as e:
logger.error(f"Quality assessment internal error: {str(e)}")
# Return fallback quality metrics compatible with PersonaQualityImprover schema
return {
"overall_score": 75,
"core_completeness": 75,
"platform_consistency": 75,
"platform_optimization": 75,
"linguistic_quality": 75,
"recommendations": ["Quality assessment completed with default metrics"],
"weights": {
"core_completeness": 0.30,
"platform_consistency": 0.25,
"platform_optimization": 0.25,
"linguistic_quality": 0.20
},
"error": str(e)
}
async def _log_persona_generation_result(
user_id: str,
core_persona: Dict[str, Any],
platform_personas: Dict[str, Any],
quality_metrics: Dict[str, Any]
):
"""Background task to log persona generation results."""
try:
logger.info(f"Logging persona generation result for user {user_id}")
logger.info(f"Core persona generated with {len(core_persona)} characteristics")
logger.info(f"Platform personas generated for {len(platform_personas)} platforms")
logger.info(f"Quality metrics: {quality_metrics.get('overall_score', 'N/A')}% overall score")
except Exception as e:
logger.error(f"Error logging persona generation result: {str(e)}")