Files
ALwrity/backend/services/research/research_persona_service.py

590 lines
27 KiB
Python

"""
Research Persona Service
Handles generation, caching, and retrieval of AI-powered research personas.
"""
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from loguru import logger
from fastapi import HTTPException
from sqlalchemy import text
from services.database import get_db_session
from models.onboarding import PersonaData, OnboardingSession
from models.research_persona_models import ResearchPersona
from .research_persona_prompt_builder import ResearchPersonaPromptBuilder
from services.llm_providers.main_text_generation import llm_text_gen
from services.persona_data_service import PersonaDataService
from api.content_planning.services.content_strategy.onboarding import OnboardingDataIntegrationService
class ResearchPersonaService:
"""Service for generating and managing research personas."""
CACHE_TTL_DAYS = 7 # 7-day cache TTL
def __init__(self, db_session=None):
self.db = db_session
self.prompt_builder = ResearchPersonaPromptBuilder()
# self.persona_data_service was initialized here but unused in this service
self.integration_service = OnboardingDataIntegrationService()
self._research_persona_cols_checked = False
def _get_session(self, user_id: str):
"""Helper to get a database session."""
if self.db:
return self.db, False
return get_db_session(user_id), True
def _ensure_research_persona_columns(self, session_db) -> None:
"""Ensure research_persona columns exist in persona_data table (runtime migration)."""
if self._research_persona_cols_checked:
return
try:
# Check if columns exist using PRAGMA (SQLite) or information_schema (PostgreSQL)
db_url = str(session_db.bind.url) if session_db.bind else ""
if 'sqlite' in db_url.lower():
# SQLite: Use PRAGMA to check columns
result = session_db.execute(text("PRAGMA table_info(persona_data)"))
cols = {row[1] for row in result} # Column name is at index 1
if 'research_persona' not in cols:
logger.info("Adding missing column research_persona to persona_data table")
session_db.execute(text("ALTER TABLE persona_data ADD COLUMN research_persona JSON"))
session_db.commit()
if 'research_persona_generated_at' not in cols:
logger.info("Adding missing column research_persona_generated_at to persona_data table")
session_db.execute(text("ALTER TABLE persona_data ADD COLUMN research_persona_generated_at TIMESTAMP"))
session_db.commit()
else:
# PostgreSQL: Try to query the columns (will fail if they don't exist)
try:
session_db.execute(text("SELECT research_persona, research_persona_generated_at FROM persona_data LIMIT 0"))
except Exception:
# Columns don't exist, add them
logger.info("Adding missing columns research_persona and research_persona_generated_at to persona_data table")
try:
session_db.execute(text("ALTER TABLE persona_data ADD COLUMN research_persona JSONB"))
session_db.execute(text("ALTER TABLE persona_data ADD COLUMN research_persona_generated_at TIMESTAMP"))
session_db.commit()
except Exception as alter_err:
logger.error(f"Failed to add research_persona columns: {alter_err}")
session_db.rollback()
raise
except Exception as e:
logger.error(f"Error ensuring research_persona columns: {e}")
session_db.rollback()
raise
finally:
self._research_persona_cols_checked = True
def get_cached_only(
self,
user_id: str
) -> Optional[ResearchPersona]:
"""
Get research persona for user if it exists in database (regardless of cache validity).
This method NEVER generates - it only returns existing personas.
Use this for config endpoints to avoid triggering rate limit checks.
Note: Returns persona even if cache is expired - cache validity only matters for regeneration.
Args:
user_id: User ID (Clerk string)
Returns:
ResearchPersona if exists in database, None otherwise
"""
db = None
should_close = False
try:
db, should_close = self._get_session(user_id)
if not db:
logger.error(f"Could not get database session for user {user_id}")
return None
# Get persona data record
persona_data = self._get_persona_data_record(user_id, db)
if not persona_data:
logger.debug(f"[get_cached_only] No persona data record found for user {user_id}")
return None
# Check if research_persona field exists and is not None/empty
# Handle cases where it might be None, empty dict {}, or empty string ""
research_persona_raw = persona_data.research_persona
has_persona = (
research_persona_raw is not None
and research_persona_raw != {}
and research_persona_raw != ""
and (isinstance(research_persona_raw, dict) and len(research_persona_raw) > 0)
)
logger.info(
f"[get_cached_only] Checking research persona for user {user_id}: "
f"persona_data exists=True, research_persona_raw={research_persona_raw is not None}, "
f"research_persona type={type(research_persona_raw)}, "
f"has_persona={has_persona}, "
f"generated_at={persona_data.research_persona_generated_at}"
)
# Return persona if it exists, regardless of cache validity
# Cache validity only matters when deciding whether to regenerate
if has_persona:
try:
cache_valid = self.is_cache_valid(persona_data)
cache_status = "valid" if cache_valid else "expired"
logger.info(
f"[get_cached_only] ✅ Returning research persona for user {user_id} "
f"(cache: {cache_status}, generated_at: {persona_data.research_persona_generated_at})"
)
# Ensure we're passing a dict to ResearchPersona
if not isinstance(research_persona_raw, dict):
logger.error(f"[get_cached_only] research_persona_raw is not a dict: {type(research_persona_raw)}")
return None
parsed_persona = ResearchPersona(**research_persona_raw)
logger.info(
f"[get_cached_only] ✅ Successfully parsed persona for user {user_id}: "
f"industry={parsed_persona.default_industry}, "
f"target_audience={parsed_persona.default_target_audience}"
)
return parsed_persona
except Exception as e:
logger.error(f"[get_cached_only] ❌ Failed to parse research persona for user {user_id}: {e}", exc_info=True)
logger.debug(
f"[get_cached_only] Persona data details: "
f"type={type(research_persona_raw)}, "
f"is_dict={isinstance(research_persona_raw, dict)}, "
f"value sample: {str(research_persona_raw)[:500] if research_persona_raw else 'None'}"
)
return None
# Persona doesn't exist in database
logger.info(f"[get_cached_only] ⚠️ No research persona found in database for user {user_id}")
return None
except Exception as e:
logger.error(f"[get_cached_only] ❌ Error getting research persona for user {user_id}: {e}", exc_info=True)
return None
finally:
if should_close and db:
db.close()
def get_or_generate(
self,
user_id: str,
force_refresh: bool = False
) -> Optional[ResearchPersona]:
"""
Get research persona for user, generating if missing or expired.
Args:
user_id: User ID (Clerk string)
force_refresh: If True, regenerate even if cache is valid
Returns:
ResearchPersona if successful, None otherwise
"""
db = None
should_close = False
try:
db, should_close = self._get_session(user_id)
if not db:
logger.error(f"Could not get database session for get_or_generate (user {user_id})")
return None
# Get persona data record
persona_data = self._get_persona_data_record(user_id, db)
if not persona_data:
logger.warning(f"No persona data found for user {user_id}, cannot generate research persona")
return None
# Check if persona exists in database
if persona_data.research_persona:
# Persona exists - check if we should return it or regenerate
cache_valid = self.is_cache_valid(persona_data)
if not force_refresh and cache_valid:
# Cache is valid - return existing persona
logger.info(f"Using cached research persona for user {user_id}")
try:
return ResearchPersona(**persona_data.research_persona)
except Exception as e:
logger.warning(f"Failed to parse cached research persona: {e}, regenerating...")
# Fall through to regeneration if parsing fails
elif not force_refresh:
# Persona exists but cache expired - return it anyway (don't regenerate unless forced)
logger.info(f"Research persona exists for user {user_id} but cache expired - returning existing persona (use force_refresh=true to regenerate)")
try:
return ResearchPersona(**persona_data.research_persona)
except Exception as e:
logger.warning(f"Failed to parse existing research persona: {e}, regenerating...")
# Fall through to regeneration if parsing fails
else:
# force_refresh=True - regenerate even though persona exists
logger.info(f"Forcing refresh of research persona for user {user_id}")
else:
# Persona doesn't exist - generate new one
logger.info(f"Research persona missing for user {user_id}, generating...")
# Generate new research persona (only reaches here if:
# 1. Persona doesn't exist, OR
# 2. force_refresh=True, OR
# 3. Parsing of existing persona failed
try:
logger.info(f"Generating research persona for user {user_id}")
research_persona = self.generate_research_persona(user_id, db)
except HTTPException:
# Re-raise HTTPExceptions (e.g., 429 subscription limit) so they propagate to API
raise
if research_persona:
# generate_research_persona saves it automatically now
logger.info(f"✅ Research persona generated and saved for user {user_id}")
return research_persona
else:
# Log detailed error for debugging expensive failures
logger.error(
f"❌ Failed to generate research persona for user {user_id} - "
f"This is an expensive failure (API call consumed). Check logs above for details."
)
# Don't return None silently - let the caller know this failed
return None
except HTTPException:
# Re-raise HTTPExceptions (e.g., 429 subscription limit) so they propagate to API
raise
except Exception as e:
logger.error(f"Error getting/generating research persona for user {user_id}: {e}")
return None
finally:
if should_close and db:
db.close()
def generate_research_persona(self, user_id: str, db=None) -> Optional[ResearchPersona]:
"""
Generate a new research persona for the user.
Args:
user_id: User ID (Clerk string)
db: Optional database session
Returns:
ResearchPersona if successful, None otherwise
"""
session_db = None
should_close = False
try:
session_db = db
if not session_db:
session_db, should_close = self._get_session(user_id)
if not session_db:
logger.error(f"Could not get database session for generate_research_persona (user {user_id})")
return None
logger.info(f"Generating research persona for user {user_id}")
# Collect onboarding data
onboarding_data = self._collect_onboarding_data(user_id, session_db)
if not onboarding_data:
logger.warning(f"Insufficient onboarding data for user {user_id}")
return None
# Build prompt
prompt = self.prompt_builder.build_research_persona_prompt(onboarding_data)
# Get JSON schema for structured response
json_schema = self.prompt_builder.get_json_schema()
# Call LLM with structured JSON response
logger.info(f"Calling LLM for research persona generation (user: {user_id})")
try:
response_text = llm_text_gen(
prompt=prompt,
json_struct=json_schema,
user_id=user_id
)
except HTTPException:
# Re-raise HTTPExceptions (e.g., 429 subscription limit) so they propagate to API
logger.warning(f"HTTPException during LLM call for user {user_id} - re-raising")
raise
except RuntimeError as e:
# Re-raise RuntimeError (subscription limits) as HTTPException
logger.warning(f"RuntimeError during LLM call for user {user_id}: {e}")
raise HTTPException(status_code=429, detail=str(e))
if not response_text:
logger.error("Empty response from LLM")
return None
# Parse JSON response
import json
try:
# When json_struct is provided, llm_text_gen may return a dict directly
if isinstance(response_text, dict):
# Already parsed, use directly
persona_dict = response_text
elif isinstance(response_text, str):
# Handle case where LLM returns markdown-wrapped JSON or plain JSON string
response_text = response_text.strip()
if response_text.startswith("```json"):
response_text = response_text[7:]
if response_text.startswith("```"):
response_text = response_text[3:]
if response_text.endswith("```"):
response_text = response_text[:-3]
response_text = response_text.strip()
persona_dict = json.loads(response_text)
else:
logger.error(f"Unexpected response type from LLM: {type(response_text)}")
return None
# Add generated_at timestamp
persona_dict["generated_at"] = datetime.utcnow().isoformat()
# Validate and create ResearchPersona
# Log the dict structure for debugging if validation fails
try:
research_persona = ResearchPersona(**persona_dict)
logger.info(f"✅ Research persona generated successfully for user {user_id}")
# Save the generated persona
save_success = self.save_research_persona(user_id, research_persona, session_db)
if not save_success:
logger.warning(f"Failed to save generated persona for user {user_id}")
return research_persona
except Exception as validation_error:
logger.error(f"Failed to validate ResearchPersona from dict: {validation_error}")
logger.debug(f"Persona dict keys: {list(persona_dict.keys()) if isinstance(persona_dict, dict) else 'Not a dict'}")
logger.debug(f"Persona dict sample: {str(persona_dict)[:500]}")
# Re-raise to be caught by outer exception handler
raise
except json.JSONDecodeError as e:
logger.error(f"Failed to parse LLM response as JSON: {e}")
logger.debug(f"Response text: {response_text[:500] if isinstance(response_text, str) else str(response_text)[:500]}")
return None
except Exception as e:
logger.error(f"Failed to create ResearchPersona from response: {e}")
return None
except HTTPException:
# Re-raise HTTPExceptions (e.g., 429 subscription limit) so they propagate to API
raise
except Exception as e:
logger.error(f"Error generating research persona for user {user_id}: {e}")
return None
finally:
if should_close and session_db:
session_db.close()
def is_cache_valid(self, persona_data: PersonaData) -> bool:
"""
Check if cached research persona is still valid (within TTL).
Args:
persona_data: PersonaData database record
Returns:
True if cache is valid, False otherwise
"""
if not persona_data.research_persona_generated_at:
return False
# Check if within TTL
cache_age = datetime.utcnow() - persona_data.research_persona_generated_at
is_valid = cache_age < timedelta(days=self.CACHE_TTL_DAYS)
if not is_valid:
logger.debug(f"Cache expired (age: {cache_age.days} days, TTL: {self.CACHE_TTL_DAYS} days)")
return is_valid
def save_research_persona(
self,
user_id: str,
research_persona: ResearchPersona,
db=None
) -> bool:
"""
Save research persona to database.
Args:
user_id: User ID (Clerk string)
research_persona: ResearchPersona to save
db: Optional database session
Returns:
True if successful, False otherwise
"""
session_db = None
should_close = False
try:
session_db = db
if not session_db:
session_db, should_close = self._get_session(user_id)
if not session_db:
logger.error(f"Could not get database session for save_research_persona (user {user_id})")
return False
persona_data = self._get_persona_data_record(user_id, session_db)
if not persona_data:
logger.error(f"No persona data record found for user {user_id}")
return False
# Convert ResearchPersona to dict for JSON storage
persona_dict = research_persona.dict()
# Update database record
persona_data.research_persona = persona_dict
persona_data.research_persona_generated_at = datetime.utcnow()
session_db.commit()
logger.info(f"✅ Research persona saved for user {user_id}")
return True
except Exception as e:
logger.error(f"Error saving research persona for user {user_id}: {e}")
if session_db:
session_db.rollback()
return False
finally:
if should_close and session_db:
session_db.close()
def _get_persona_data_record(self, user_id: str, db=None) -> Optional[PersonaData]:
"""Get PersonaData database record for user."""
try:
session_db = db or self.db
if not session_db:
logger.error(f"No database session provided for _get_persona_data_record (user {user_id})")
return None
# Ensure research_persona columns exist before querying
self._ensure_research_persona_columns(session_db)
# Get onboarding session
session = session_db.query(OnboardingSession).filter(
OnboardingSession.user_id == user_id
).first()
if not session:
return None
# Get persona data
persona_data = session_db.query(PersonaData).filter(
PersonaData.session_id == session.id
).first()
return persona_data
except Exception as e:
logger.error(f"Error getting persona data record for user {user_id}: {e}")
return None
def _collect_onboarding_data(self, user_id: str, db=None) -> Optional[Dict[str, Any]]:
"""
Collect all onboarding data needed for research persona generation.
Returns:
Dictionary with website_analysis, persona_data, research_preferences, business_info
"""
try:
session_db = db or self.db
if not session_db:
logger.error(f"No database session provided for _collect_onboarding_data (user {user_id})")
return None
# Get integrated data via SSOT
integrated_data = self.integration_service.get_integrated_data_sync(user_id, session_db)
if not integrated_data:
logger.warning(f"No integrated data found for user {user_id}")
return None
website_analysis = integrated_data.get('website_analysis', {})
persona_data_dict = integrated_data.get('persona_data', {})
research_prefs = integrated_data.get('research_preferences', {})
canonical_profile = integrated_data.get('canonical_profile', {})
business_info = {}
canonical_business = canonical_profile.get('business_info')
if isinstance(canonical_business, dict):
business_info.update(canonical_business)
# Use canonical profile data (SSOT) instead of manual logic if possible
# The canonical profile already handles logic for industry/target_audience from various sources
if not business_info.get('industry') and canonical_profile.get('industry'):
business_info['industry'] = canonical_profile.get('industry')
if not business_info.get('target_audience') and canonical_profile.get('target_audience'):
business_info['target_audience'] = canonical_profile.get('target_audience')
# Fallback logic if canonical profile is missing these (though it should have them)
if not business_info.get('industry') and website_analysis:
target_audience_data = website_analysis.get('target_audience', {})
if isinstance(target_audience_data, dict):
industry_focus = target_audience_data.get('industry_focus')
if industry_focus:
business_info['industry'] = industry_focus
has_basic_data = bool(
website_analysis or
persona_data_dict or
research_prefs.get('content_types') or
business_info.get('industry')
)
if not has_basic_data:
logger.warning(f"Insufficient onboarding data for user {user_id} - no basic data found")
return None
# If we have minimal data, add intelligent defaults to help the AI
if not business_info.get('industry'):
# Try to infer industry from research preferences or content types
content_types = research_prefs.get('content_types', [])
if 'blog' in content_types or 'article' in content_types:
business_info['industry'] = 'Content Marketing'
business_info['inferred'] = True
elif 'social_media' in content_types:
business_info['industry'] = 'Social Media Marketing'
business_info['inferred'] = True
elif 'video' in content_types:
business_info['industry'] = 'Video Content Creation'
business_info['inferred'] = True
if not business_info.get('target_audience'):
# Default to professionals for content creators
business_info['target_audience'] = 'Professionals and content consumers'
business_info['inferred'] = True
# Get competitor analysis data (if available)
# Use SSOT (Integrated data contains competitor info)
competitor_analysis = integrated_data.get('competitor_analysis')
if not competitor_analysis:
competitor_analysis = []
return {
"website_analysis": website_analysis,
"persona_data": persona_data_dict,
"research_preferences": research_prefs,
"business_info": business_info,
"competitor_analysis": competitor_analysis
}
except Exception as e:
logger.error(f"Error collecting onboarding data for user {user_id}: {e}")
return None