Add brand analysis columns to onboarding database and migration scripts

This commit is contained in:
ajaysi
2025-10-11 17:05:42 +05:30
parent b1ebe1034e
commit 1df12a64a2
25 changed files with 2415 additions and 90 deletions

View File

@@ -8,13 +8,16 @@ from fastapi import HTTPException
from loguru import logger
from services.api_key_manager import get_onboarding_progress_for_user, get_api_key_manager, StepStatus
from services.onboarding_database_service import OnboardingDatabaseService
from services.database import get_db
from services.persona_analysis_service import PersonaAnalysisService
class OnboardingCompletionService:
"""Service for handling onboarding completion logic."""
def __init__(self):
self.required_steps = [1, 2, 3, 6] # Steps 1, 2, 3, and 6 are required
# Only pre-requisite steps; step 6 is the finalization itself
self.required_steps = [1, 2, 3]
async def complete_onboarding(self, current_user: Dict[str, Any]) -> Dict[str, Any]:
"""Complete the onboarding process with full validation."""
@@ -22,8 +25,8 @@ class OnboardingCompletionService:
user_id = str(current_user.get('id'))
progress = get_onboarding_progress_for_user(user_id)
# Validate required steps are completed
missing_steps = self._validate_required_steps(progress)
# Validate required steps are completed (with DB-aware fallbacks)
missing_steps = self._validate_required_steps(user_id, progress)
if missing_steps:
missing_steps_str = ", ".join(missing_steps)
raise HTTPException(
@@ -53,13 +56,75 @@ class OnboardingCompletionService:
logger.error(f"Error completing onboarding: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")
def _validate_required_steps(self, progress) -> List[str]:
"""Validate that all required steps are completed."""
def _validate_required_steps(self, user_id: str, progress) -> List[str]:
"""Validate that all required steps are completed.
This method trusts the progress tracker, but also falls back to
database presence for Steps 2 and 3 so migration from file→DB
does not block completion.
"""
missing_steps = []
db = None
db_service = None
try:
db = next(get_db())
db_service = OnboardingDatabaseService(db)
except Exception:
db = None
db_service = None
for step_num in self.required_steps:
step = progress.get_step_data(step_num)
if step and step.status not in [StepStatus.COMPLETED, StepStatus.SKIPPED]:
if step and step.status in [StepStatus.COMPLETED, StepStatus.SKIPPED]:
continue
# DB-aware fallbacks for migration period
try:
if db_service:
if step_num == 2:
# Treat as completed if website analysis exists in DB
website = db_service.get_website_analysis(user_id, db)
if website and (website.get('website_url') or website.get('writing_style')):
# Optionally mark as completed in progress to keep state consistent
try:
progress.mark_step_completed(2, {'source': 'db-fallback'})
except Exception:
pass
continue
# Secondary fallback: research preferences captured style data
prefs = db_service.get_research_preferences(user_id, db)
if prefs and (prefs.get('writing_style') or prefs.get('content_characteristics')):
try:
progress.mark_step_completed(2, {'source': 'research-prefs-fallback'})
except Exception:
pass
continue
# Tertiary fallback: persona data created implies earlier steps done
persona = None
try:
persona = db_service.get_persona_data(user_id, db)
except Exception:
persona = None
if persona and persona.get('corePersona'):
try:
progress.mark_step_completed(2, {'source': 'persona-fallback'})
except Exception:
pass
continue
if step_num == 3:
# Treat as completed if research preferences exist in DB
prefs = db_service.get_research_preferences(user_id, db)
if prefs and prefs.get('research_depth'):
try:
progress.mark_step_completed(3, {'source': 'db-fallback'})
except Exception:
pass
continue
except Exception:
# If DB check fails, fall back to progress status only
pass
if step:
missing_steps.append(step.title)
return missing_steps

View File

@@ -9,6 +9,7 @@ from loguru import logger
from services.api_key_manager import get_api_key_manager
from services.database import get_db
from services.onboarding_database_service import OnboardingDatabaseService
from services.website_analysis_service import WebsiteAnalysisService
from services.research_preferences_service import ResearchPreferencesService
from services.persona_analysis_service import PersonaAnalysisService
@@ -23,14 +24,10 @@ class OnboardingSummaryService:
Args:
user_id: Clerk user ID from authenticated request
"""
# Convert Clerk user ID to integer for database compatibility
try:
self.user_id_int = int(user_id.replace('user_', '').replace('-', '')[:8], 16) % 2147483647
except:
self.user_id_int = hash(user_id) % 2147483647
self.user_id = user_id # Store Clerk user ID (string)
self.db_service = OnboardingDatabaseService()
self.user_id = user_id # Store original Clerk ID for logging
self.session_id = self.user_id_int # Use user ID as session ID for backwards compatibility
logger.info(f"OnboardingSummaryService initialized for user {user_id} (database mode)")
async def get_onboarding_summary(self) -> Dict[str, Any]:
"""Get comprehensive onboarding summary for FinalStep."""
@@ -69,40 +66,75 @@ class OnboardingSummaryService:
raise HTTPException(status_code=500, detail="Internal server error")
def _get_api_keys(self) -> Dict[str, Any]:
"""Get configured API keys."""
api_manager = get_api_key_manager()
return api_manager.get_all_keys()
def _get_website_analysis(self) -> Optional[Dict[str, Any]]:
"""Get website analysis data."""
"""Get configured API keys from database."""
try:
db = next(get_db())
website_service = WebsiteAnalysisService(db)
return website_service.get_analysis_by_session(self.session_id)
api_keys = self.db_service.get_api_keys(self.user_id, db)
logger.info(f"Retrieved {len(api_keys)} API keys from database for user {self.user_id}")
return api_keys
except Exception as e:
logger.warning(f"Could not get website analysis: {str(e)}")
logger.error(f"Error getting API keys from database: {e}")
return {}
def _get_website_analysis(self) -> Optional[Dict[str, Any]]:
"""Get website analysis data from database (Step 2)."""
try:
db = next(get_db())
website_data = self.db_service.get_website_analysis(self.user_id, db)
if website_data:
logger.info(f"Retrieved website analysis from database for user {self.user_id}")
else:
logger.warning(f"No website analysis found in database for user {self.user_id}")
return website_data
except Exception as e:
logger.error(f"Error getting website analysis from database: {e}")
return None
def _get_research_preferences(self) -> Optional[Dict[str, Any]]:
"""Get research preferences data."""
"""Get research preferences data from database (Step 3)."""
try:
db = next(get_db())
research_service = ResearchPreferencesService(db)
return research_service.get_research_preferences(self.session_id)
research_data = self.db_service.get_research_preferences(self.user_id, db)
if research_data:
logger.info(f"Retrieved research preferences from database for user {self.user_id}")
else:
logger.warning(f"No research preferences found in database for user {self.user_id}")
return research_data
except Exception as e:
logger.warning(f"Could not get research preferences: {str(e)}")
logger.error(f"Error getting research preferences from database: {e}")
return None
def _get_personalization_settings(self, research_preferences: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Get personalization settings from research preferences."""
if not research_preferences:
"""Get personalization settings from Step 4 (Persona) database."""
try:
# Try to get from Step 4 (Persona) in database
db = next(get_db())
persona_data = self.db_service.get_persona_data(self.user_id, db)
if persona_data:
logger.info(f"Retrieved persona data from database for user {self.user_id}")
# Extract personalization settings from persona data
if 'corePersona' in persona_data:
core_persona = persona_data.get('corePersona', {})
return {
'writing_style': core_persona.get('linguistic_fingerprint', {}).get('tone', 'Professional'),
'tone': core_persona.get('tonal_range', {}).get('primary_tone', 'Formal'),
'brand_voice': core_persona.get('identity', {}).get('voice', 'Trustworthy and Expert')
}
# Fallback to research preferences if persona data not available
if research_preferences:
logger.info(f"Using research preferences as fallback for personalization")
return {
'writing_style': research_preferences.get('writing_style', {}).get('tone', 'Professional'),
'tone': research_preferences.get('writing_style', {}).get('voice', 'Formal'),
'brand_voice': research_preferences.get('writing_style', {}).get('complexity', 'Trustworthy and Expert')
}
return None
except Exception as e:
logger.error(f"Error getting personalization settings from database: {e}")
return None
return {
'writing_style': research_preferences.get('writing_style', {}).get('tone', 'Professional'),
'tone': research_preferences.get('writing_style', {}).get('voice', 'Formal'),
'brand_voice': research_preferences.get('writing_style', {}).get('complexity', 'Trustworthy and Expert')
}
def _check_persona_readiness(self, website_analysis: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Check if persona can be generated."""

View File

@@ -0,0 +1,16 @@
-- Migration: Update onboarding_sessions.user_id from INTEGER to STRING
-- This migration updates the user_id column to support Clerk user IDs (strings)
-- Step 1: Alter the user_id column type from INTEGER to VARCHAR(255)
ALTER TABLE onboarding_sessions
ALTER COLUMN user_id TYPE VARCHAR(255);
-- Step 2: Create an index on user_id for faster lookups
CREATE INDEX IF NOT EXISTS idx_onboarding_sessions_user_id ON onboarding_sessions(user_id);
-- Note: This migration assumes no existing data needs to be preserved
-- If you have existing data with integer user_ids, you may need to:
-- 1. Backup the data first
-- 2. Clear the table or convert the integers to strings
-- 3. Then apply this migration

View File

@@ -8,7 +8,7 @@ Base = declarative_base()
class OnboardingSession(Base):
__tablename__ = 'onboarding_sessions'
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(Integer, nullable=False) # Replace with ForeignKey if you have a user table
user_id = Column(String(255), nullable=False) # Clerk user ID (string)
current_step = Column(Integer, default=1)
progress = Column(Float, default=0.0)
started_at = Column(DateTime, default=func.now())
@@ -60,6 +60,8 @@ class WebsiteAnalysis(Base):
target_audience = Column(JSON) # Demographics, expertise level, industry focus
content_type = Column(JSON) # Primary type, secondary types, purpose
recommended_settings = Column(JSON) # Writing tone, target audience, content type
# brand_analysis = Column(JSON) # Brand voice, values, positioning, competitive differentiation
# content_strategy_insights = Column(JSON) # SWOT analysis, strengths, weaknesses, opportunities, threats
# Crawl results
crawl_result = Column(JSON) # Raw crawl data
@@ -90,6 +92,8 @@ class WebsiteAnalysis(Base):
'target_audience': self.target_audience,
'content_type': self.content_type,
'recommended_settings': self.recommended_settings,
# 'brand_analysis': self.brand_analysis,
# 'content_strategy_insights': self.content_strategy_insights,
'crawl_result': self.crawl_result,
'style_patterns': self.style_patterns,
'style_guidelines': self.style_guidelines,

View File

@@ -0,0 +1,82 @@
"""
Add brand_analysis and content_strategy_insights columns to website_analyses table.
These columns store rich brand insights and SWOT analysis from Step 2.
"""
import sys
import os
from pathlib import Path
from loguru import logger
# Add parent directory to path
sys.path.append(str(Path(__file__).parent.parent))
from sqlalchemy import text, inspect
from services.database import SessionLocal, engine
def add_brand_analysis_columns():
"""Add brand_analysis and content_strategy_insights columns if they don't exist."""
db = SessionLocal()
try:
# Check if columns already exist
inspector = inspect(engine)
columns = [col['name'] for col in inspector.get_columns('website_analyses')]
brand_analysis_exists = 'brand_analysis' in columns
content_strategy_insights_exists = 'content_strategy_insights' in columns
if brand_analysis_exists and content_strategy_insights_exists:
logger.info("✅ Columns already exist. No migration needed.")
return True
logger.info("🔄 Starting migration to add brand analysis columns...")
# Add brand_analysis column if missing
if not brand_analysis_exists:
logger.info("Adding brand_analysis column...")
db.execute(text("""
ALTER TABLE website_analyses
ADD COLUMN brand_analysis JSON
"""))
logger.success("✅ Added brand_analysis column")
# Add content_strategy_insights column if missing
if not content_strategy_insights_exists:
logger.info("Adding content_strategy_insights column...")
db.execute(text("""
ALTER TABLE website_analyses
ADD COLUMN content_strategy_insights JSON
"""))
logger.success("✅ Added content_strategy_insights column")
db.commit()
logger.success("🎉 Migration completed successfully!")
return True
except Exception as e:
logger.error(f"❌ Migration failed: {e}")
db.rollback()
return False
finally:
db.close()
if __name__ == "__main__":
logger.info("=" * 60)
logger.info("DATABASE MIGRATION: Add Brand Analysis Columns")
logger.info("=" * 60)
success = add_brand_analysis_columns()
if success:
logger.success("\n✅ Migration completed successfully!")
logger.info("The website_analyses table now includes:")
logger.info(" - brand_analysis: Brand voice, values, positioning")
logger.info(" - content_strategy_insights: SWOT analysis, recommendations")
else:
logger.error("\n❌ Migration failed. Please check the error messages above.")
sys.exit(1)

View File

@@ -0,0 +1,129 @@
"""
Migration Script: Update onboarding_sessions.user_id from INTEGER to STRING
This script updates the database schema to support Clerk user IDs (strings)
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from loguru import logger
from sqlalchemy import text
from services.database import SessionLocal, engine
def migrate_user_id_column():
"""Migrate user_id column from INTEGER to VARCHAR(255)."""
try:
db = SessionLocal()
logger.info("Starting migration: user_id INTEGER -> VARCHAR(255)")
# Check if table exists (SQLite compatible)
check_table_query = """
SELECT name FROM sqlite_master
WHERE type='table' AND name='onboarding_sessions';
"""
result = db.execute(text(check_table_query))
table_exists = result.scalar()
if not table_exists:
logger.warning("Table 'onboarding_sessions' does not exist. Creating it instead.")
# Create tables using the updated models
from models.onboarding import Base
Base.metadata.create_all(bind=engine, checkfirst=True)
logger.success("✅ Created onboarding_sessions table with VARCHAR user_id")
return True
# Check current column type (SQLite compatible)
check_column_query = """
SELECT type FROM pragma_table_info('onboarding_sessions')
WHERE name = 'user_id';
"""
result = db.execute(text(check_column_query))
current_type = result.scalar()
if current_type and 'varchar' in current_type.lower():
logger.info(f"✅ Column user_id is already VARCHAR ({current_type}). No migration needed.")
return True
logger.info(f"Current user_id type: {current_type}")
# Backup existing data count
count_query = "SELECT COUNT(*) FROM onboarding_sessions;"
result = db.execute(text(count_query))
record_count = result.scalar()
logger.info(f"Found {record_count} existing records")
if record_count > 0:
logger.warning("⚠️ Found existing records. Backing up data...")
# You may want to add backup logic here if needed
# SQLite doesn't support ALTER COLUMN TYPE directly
# We need to recreate the table
logger.info("Recreating table with VARCHAR user_id (SQLite limitation)...")
# Backup data
logger.info("Backing up existing data...")
backup_query = """
CREATE TABLE onboarding_sessions_backup AS
SELECT * FROM onboarding_sessions;
"""
db.execute(text(backup_query))
db.commit()
# Drop old table
logger.info("Dropping old table...")
db.execute(text("DROP TABLE onboarding_sessions;"))
db.commit()
# Recreate table with correct schema
logger.info("Creating new table with VARCHAR user_id...")
from models.onboarding import Base
Base.metadata.create_all(bind=engine, tables=[Base.metadata.tables['onboarding_sessions']], checkfirst=False)
db.commit()
# Restore data (converting integers to strings)
logger.info("Restoring data...")
restore_query = """
INSERT INTO onboarding_sessions (id, user_id, current_step, progress, started_at, updated_at)
SELECT id, CAST(user_id AS TEXT), current_step, progress, started_at, updated_at
FROM onboarding_sessions_backup;
"""
db.execute(text(restore_query))
db.commit()
# Drop backup table
logger.info("Cleaning up backup table...")
db.execute(text("DROP TABLE onboarding_sessions_backup;"))
db.commit()
logger.success("✅ Table recreated successfully")
logger.success("🎉 Migration completed successfully!")
return True
except Exception as e:
logger.error(f"❌ Migration failed: {e}")
if db:
db.rollback()
return False
finally:
if db:
db.close()
if __name__ == "__main__":
logger.info("="*60)
logger.info("DATABASE MIGRATION: user_id INTEGER -> VARCHAR(255)")
logger.info("="*60)
success = migrate_user_id_column()
if success:
logger.success("\n✅ Migration completed successfully!")
logger.info("The onboarding system now supports Clerk user IDs (strings)")
else:
logger.error("\n❌ Migration failed. Please check the logs above.")
sys.exit(1)

View File

@@ -0,0 +1,73 @@
"""
Verify current user data in the database
Check if data is being saved with Clerk user IDs
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from loguru import logger
from services.database import SessionLocal
from models.onboarding import OnboardingSession, APIKey, WebsiteAnalysis, ResearchPreferences
def verify_user_data():
"""Check what user_id format is being used."""
try:
db = SessionLocal()
logger.info("Checking onboarding_sessions table...")
sessions = db.query(OnboardingSession).all()
logger.info(f"Found {len(sessions)} sessions:")
for session in sessions:
logger.info(f" Session ID: {session.id}")
logger.info(f" User ID: {session.user_id} (type: {type(session.user_id).__name__})")
logger.info(f" Current Step: {session.current_step}")
logger.info(f" Progress: {session.progress}%")
# Check API keys for this session
api_keys = db.query(APIKey).filter(APIKey.session_id == session.id).all()
logger.info(f" API Keys: {len(api_keys)} found")
for key in api_keys:
logger.info(f" - {key.provider}")
# Check website analysis
website = db.query(WebsiteAnalysis).filter(WebsiteAnalysis.session_id == session.id).first()
if website:
logger.info(f" Website Analysis: {website.website_url}")
else:
logger.info(f" Website Analysis: None")
# Check research preferences
research = db.query(ResearchPreferences).filter(ResearchPreferences.session_id == session.id).first()
if research:
logger.info(f" Research Preferences: Found")
else:
logger.info(f" Research Preferences: None")
logger.info("")
if len(sessions) == 0:
logger.warning("⚠️ No sessions found in database!")
logger.info("This means either:")
logger.info(" 1. No onboarding data has been saved yet")
logger.info(" 2. Data was cleared during migration")
logger.info("\nYou need to go through onboarding steps 1-5 again to save data with Clerk user ID")
return True
except Exception as e:
logger.error(f"Error verifying data: {e}")
return False
finally:
if db:
db.close()
if __name__ == "__main__":
logger.info("="*60)
logger.info("VERIFY CURRENT USER DATA IN DATABASE")
logger.info("="*60)
verify_user_data()

View File

@@ -170,8 +170,36 @@ class OnboardingProgress:
required_steps = [1, 2, 3, 6] # Steps 1, 2, 3, and 6 are required
for step_num in required_steps:
step = self.get_step_data(step_num)
if step and step.status not in [StepStatus.COMPLETED, StepStatus.SKIPPED]:
return False
if step and step.status in [StepStatus.COMPLETED, StepStatus.SKIPPED]:
continue
# DB-aware fallback for steps 2 and 3
try:
from services.onboarding_database_service import OnboardingDatabaseService
from services.database import get_db
db = next(get_db())
db_service = OnboardingDatabaseService(db)
if step_num == 2:
w = db_service.get_website_analysis(self.user_id, db)
if w and (w.get('website_url') or w.get('writing_style')):
# Mark as completed to normalize state
try:
self.mark_step_completed(2, {'source': 'db-fallback'})
except Exception:
pass
continue
if step_num == 3:
p = db_service.get_research_preferences(self.user_id, db)
if p and p.get('research_depth'):
try:
self.mark_step_completed(3, {'source': 'db-fallback'})
except Exception:
pass
continue
except Exception:
pass
return False
return True
def get_completion_percentage(self) -> float:

View File

@@ -5,10 +5,13 @@ This replaces the JSON file-based storage with proper database persistence.
"""
from typing import Dict, Any, Optional, List
import os
import json
from datetime import datetime
from loguru import logger
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import text
from models.onboarding import OnboardingSession, APIKey, WebsiteAnalysis, ResearchPreferences, PersonaData
from services.database import get_db
@@ -20,6 +23,85 @@ class OnboardingDatabaseService:
def __init__(self, db: Session = None):
"""Initialize with optional database session."""
self.db = db
# Cache for schema feature detection
self._brand_cols_checked: bool = False
self._brand_cols_available: bool = False
# --- Feature flags and schema detection helpers ---
def _brand_feature_enabled(self) -> bool:
"""Check if writing brand-related columns is enabled via env flag."""
return os.getenv('ENABLE_WEBSITE_BRAND_COLUMNS', 'true').lower() in {'1', 'true', 'yes', 'on'}
def _ensure_brand_column_detection(self, session_db: Session) -> None:
"""Detect at runtime whether brand columns exist and cache the result."""
if self._brand_cols_checked:
return
try:
# This works across SQLite/Postgres; LIMIT 0 avoids scanning
session_db.execute(text('SELECT brand_analysis, content_strategy_insights FROM website_analyses LIMIT 0'))
self._brand_cols_available = True
except Exception:
self._brand_cols_available = False
finally:
self._brand_cols_checked = True
def _maybe_update_brand_columns(self, session_db: Session, session_id: int, brand_analysis: Any, content_strategy_insights: Any) -> None:
"""Safely update brand columns using raw SQL if feature enabled and columns exist."""
if not self._brand_feature_enabled():
return
self._ensure_brand_column_detection(session_db)
if not self._brand_cols_available:
return
try:
session_db.execute(
text('''
UPDATE website_analyses
SET brand_analysis = :brand_analysis,
content_strategy_insights = :content_strategy_insights
WHERE session_id = :session_id
'''),
{
'brand_analysis': json.dumps(brand_analysis) if brand_analysis is not None else None,
'content_strategy_insights': json.dumps(content_strategy_insights) if content_strategy_insights is not None else None,
'session_id': session_id,
}
)
except Exception as e:
logger.warning(f"Skipped updating brand columns (not critical): {e}")
def _maybe_attach_brand_columns(self, session_db: Session, session_id: int, result: Dict[str, Any]) -> None:
"""Optionally read brand columns and attach to result if available."""
if not self._brand_feature_enabled():
return
self._ensure_brand_column_detection(session_db)
if not self._brand_cols_available:
return
try:
row = session_db.execute(
text('''
SELECT brand_analysis, content_strategy_insights
FROM website_analyses WHERE session_id = :session_id LIMIT 1
'''),
{'session_id': session_id}
).mappings().first()
if row:
brand = row.get('brand_analysis')
insights = row.get('content_strategy_insights')
# If stored as TEXT in SQLite, try to parse JSON
if isinstance(brand, str):
try:
brand = json.loads(brand)
except Exception:
pass
if isinstance(insights, str):
try:
insights = json.loads(insights)
except Exception:
pass
result['brand_analysis'] = brand
result['content_strategy_insights'] = insights
except Exception as e:
logger.warning(f"Skipped reading brand columns (not critical): {e}")
def get_or_create_session(self, user_id: str, db: Session = None) -> OnboardingSession:
"""Get existing onboarding session or create new one for user."""
@@ -178,6 +260,24 @@ class OnboardingDatabaseService:
try:
session = self.get_or_create_session(user_id, session_db)
# Normalize payload. Step 2 sometimes sends { website, analysis: {...} }
# while DB expects flattened fields. Support both shapes.
incoming = analysis_data or {}
nested = incoming.get('analysis') if isinstance(incoming.get('analysis'), dict) else None
normalized = {
'website_url': incoming.get('website') or incoming.get('website_url') or '',
'writing_style': (nested or incoming).get('writing_style'),
'content_characteristics': (nested or incoming).get('content_characteristics'),
'target_audience': (nested or incoming).get('target_audience'),
'content_type': (nested or incoming).get('content_type'),
'recommended_settings': (nested or incoming).get('recommended_settings'),
'brand_analysis': (nested or incoming).get('brand_analysis'),
'content_strategy_insights': (nested or incoming).get('content_strategy_insights'),
'crawl_result': (nested or incoming).get('crawl_result'),
'style_patterns': (nested or incoming).get('style_patterns'),
'style_guidelines': (nested or incoming).get('style_guidelines'),
'status': (nested or incoming).get('status', incoming.get('status', 'completed')),
}
# Check if analysis already exists
existing = session_db.query(WebsiteAnalysis).filter(
@@ -186,37 +286,46 @@ class OnboardingDatabaseService:
if existing:
# Update existing
existing.website_url = analysis_data.get('website_url', existing.website_url)
existing.writing_style = analysis_data.get('writing_style')
existing.content_characteristics = analysis_data.get('content_characteristics')
existing.target_audience = analysis_data.get('target_audience')
existing.content_type = analysis_data.get('content_type')
existing.recommended_settings = analysis_data.get('recommended_settings')
existing.crawl_result = analysis_data.get('crawl_result')
existing.style_patterns = analysis_data.get('style_patterns')
existing.style_guidelines = analysis_data.get('style_guidelines')
existing.status = analysis_data.get('status', 'completed')
existing.website_url = normalized.get('website_url', existing.website_url)
existing.writing_style = normalized.get('writing_style')
existing.content_characteristics = normalized.get('content_characteristics')
existing.target_audience = normalized.get('target_audience')
existing.content_type = normalized.get('content_type')
existing.recommended_settings = normalized.get('recommended_settings')
existing.crawl_result = normalized.get('crawl_result')
existing.style_patterns = normalized.get('style_patterns')
existing.style_guidelines = normalized.get('style_guidelines')
existing.status = normalized.get('status', 'completed')
existing.updated_at = datetime.now()
logger.info(f"Updated website analysis for user {user_id}")
else:
# Create new
analysis = WebsiteAnalysis(
session_id=session.id,
website_url=analysis_data.get('website_url', ''),
writing_style=analysis_data.get('writing_style'),
content_characteristics=analysis_data.get('content_characteristics'),
target_audience=analysis_data.get('target_audience'),
content_type=analysis_data.get('content_type'),
recommended_settings=analysis_data.get('recommended_settings'),
crawl_result=analysis_data.get('crawl_result'),
style_patterns=analysis_data.get('style_patterns'),
style_guidelines=analysis_data.get('style_guidelines'),
status=analysis_data.get('status', 'completed')
website_url=normalized.get('website_url', ''),
writing_style=normalized.get('writing_style'),
content_characteristics=normalized.get('content_characteristics'),
target_audience=normalized.get('target_audience'),
content_type=normalized.get('content_type'),
recommended_settings=normalized.get('recommended_settings'),
crawl_result=normalized.get('crawl_result'),
style_patterns=normalized.get('style_patterns'),
style_guidelines=normalized.get('style_guidelines'),
status=normalized.get('status', 'completed')
)
session_db.add(analysis)
logger.info(f"Created website analysis for user {user_id}")
session_db.commit()
# Optional brand column update via raw SQL (feature-flagged)
self._maybe_update_brand_columns(
session_db=session_db,
session_id=session.id,
brand_analysis=normalized.get('brand_analysis'),
content_strategy_insights=normalized.get('content_strategy_insights')
)
session_db.commit()
return True
except SQLAlchemyError as e:
@@ -239,7 +348,11 @@ class OnboardingDatabaseService:
WebsiteAnalysis.session_id == session.id
).first()
return analysis.to_dict() if analysis else None
result = analysis.to_dict() if analysis else None
if result:
# Optionally include brand fields without touching ORM mapping
self._maybe_attach_brand_columns(session_db, session.id, result)
return result
except SQLAlchemyError as e:
logger.error(f"Error getting website analysis: {e}")
@@ -358,6 +471,36 @@ class OnboardingDatabaseService:
logger.error(f"Error getting research preferences: {e}")
return None
def get_persona_data(self, user_id: str, db: Session = None) -> Optional[Dict[str, Any]]:
"""Get persona data for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
session = self.get_session_by_user(user_id, session_db)
if not session:
return None
persona = session_db.query(PersonaData).filter(
PersonaData.session_id == session.id
).first()
if not persona:
return None
# Return persona data in the expected format
return {
'corePersona': persona.core_persona,
'platformPersonas': persona.platform_personas,
'qualityMetrics': persona.quality_metrics,
'selectedPlatforms': persona.selected_platforms
}
except SQLAlchemyError as e:
logger.error(f"Error getting persona data: {e}")
return None
def mark_onboarding_complete(self, user_id: str, db: Session = None) -> bool:
"""Mark onboarding as complete for user."""
session_db = db or self.db