839 lines
43 KiB
Python
839 lines
43 KiB
Python
"""
|
|
Step Management Service
|
|
Handles onboarding step operations and progress tracking.
|
|
"""
|
|
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime
|
|
from fastapi import HTTPException
|
|
from loguru import logger
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from api.content_planning.services.content_strategy.onboarding import OnboardingDataIntegrationService
|
|
from services.database import get_db
|
|
from models.onboarding import OnboardingSession, APIKey, WebsiteAnalysis, ResearchPreferences, PersonaData, CompetitorAnalysis
|
|
from services.intelligence.agent_flat_context import AgentFlatContextStore
|
|
|
|
class StepManagementService:
|
|
"""Service for handling onboarding step management."""
|
|
|
|
def __init__(self):
|
|
self.integration_service = OnboardingDataIntegrationService()
|
|
|
|
def _get_or_create_session(self, user_id: str, db: Session) -> OnboardingSession:
|
|
"""Get or create onboarding session."""
|
|
session = db.query(OnboardingSession).filter(
|
|
OnboardingSession.user_id == user_id
|
|
).first()
|
|
|
|
if not session:
|
|
session = OnboardingSession(
|
|
user_id=user_id,
|
|
current_step=1,
|
|
progress=0.0,
|
|
started_at=datetime.utcnow(),
|
|
updated_at=datetime.utcnow()
|
|
)
|
|
db.add(session)
|
|
db.commit()
|
|
db.refresh(session)
|
|
|
|
return session
|
|
|
|
def _save_api_key(self, user_id: str, provider: str, api_key: str, db: Session) -> bool:
|
|
"""Save API key directly to database."""
|
|
try:
|
|
session = self._get_or_create_session(user_id, db)
|
|
|
|
existing_key = db.query(APIKey).filter(
|
|
APIKey.session_id == session.id,
|
|
APIKey.provider == provider
|
|
).first()
|
|
|
|
if existing_key:
|
|
existing_key.key = api_key
|
|
existing_key.updated_at = datetime.utcnow()
|
|
else:
|
|
new_key = APIKey(
|
|
session_id=session.id,
|
|
provider=provider,
|
|
key=api_key
|
|
)
|
|
db.add(new_key)
|
|
|
|
db.commit()
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error saving API key for user {user_id}: {e}")
|
|
db.rollback()
|
|
raise e
|
|
|
|
def _save_website_analysis(self, user_id: str, analysis_data: Dict[str, Any], db: Session) -> bool:
|
|
"""Save website analysis directly to database."""
|
|
try:
|
|
session = self._get_or_create_session(user_id, db)
|
|
|
|
# Normalize payload
|
|
incoming = analysis_data or {}
|
|
nested = incoming.get('analysis') if isinstance(incoming.get('analysis'), dict) else None
|
|
|
|
# Extract extra fields
|
|
brand_analysis = (nested or incoming).get('brand_analysis')
|
|
content_strategy_insights = (nested or incoming).get('content_strategy_insights')
|
|
meta_info = (nested or incoming).get('meta_info')
|
|
|
|
# Fix: Check both nested and incoming for social_media_presence
|
|
social_media_presence = (nested or {}).get('social_media_presence') or incoming.get('social_media_presence')
|
|
|
|
seo_audit = (nested or incoming).get('seo_audit')
|
|
style_patterns = (nested or incoming).get('style_patterns')
|
|
style_guidelines = (nested or incoming).get('guidelines')
|
|
sitemap_analysis = (nested or incoming).get('sitemap_analysis')
|
|
|
|
# Prepare crawl_result
|
|
crawl_result = incoming.get('crawl_result') or {}
|
|
if not isinstance(crawl_result, dict):
|
|
crawl_result = {"raw": crawl_result}
|
|
|
|
# Meta info still goes to crawl_result as we didn't add a column for it
|
|
if meta_info:
|
|
crawl_result['meta_info'] = meta_info
|
|
|
|
# Store sitemap_analysis in crawl_result as we don't have a dedicated column yet
|
|
if sitemap_analysis:
|
|
crawl_result['sitemap_analysis'] = sitemap_analysis
|
|
|
|
normalized = {
|
|
'website_url': incoming.get('website') or incoming.get('website_url') or '',
|
|
'writing_style': (nested or incoming).get('writing_style'),
|
|
'content_characteristics': (nested or incoming).get('content_characteristics'),
|
|
'target_audience': (nested or incoming).get('target_audience'),
|
|
'content_type': (nested or incoming).get('content_type'),
|
|
'recommended_settings': (nested or incoming).get('recommended_settings'),
|
|
'brand_analysis': brand_analysis,
|
|
'content_strategy_insights': content_strategy_insights,
|
|
'social_media_presence': social_media_presence,
|
|
'crawl_result': crawl_result,
|
|
'seo_audit': seo_audit,
|
|
'style_patterns': style_patterns,
|
|
'style_guidelines': style_guidelines
|
|
}
|
|
|
|
# Filter only valid columns to prevent TypeError
|
|
valid_columns = [c.name for c in WebsiteAnalysis.__table__.columns if c.name not in ['id', 'session_id', 'created_at', 'updated_at']]
|
|
filtered_data = {k: v for k, v in normalized.items() if k in valid_columns and v is not None}
|
|
|
|
existing_analysis = db.query(WebsiteAnalysis).filter(
|
|
WebsiteAnalysis.session_id == session.id
|
|
).first()
|
|
|
|
if existing_analysis:
|
|
for key, value in filtered_data.items():
|
|
setattr(existing_analysis, key, value)
|
|
existing_analysis.updated_at = datetime.utcnow()
|
|
else:
|
|
new_analysis = WebsiteAnalysis(
|
|
session_id=session.id,
|
|
**filtered_data
|
|
)
|
|
db.add(new_analysis)
|
|
|
|
db.commit()
|
|
|
|
# Persist Step 2 snapshot to agent flat-file context for ultra-fast reads
|
|
try:
|
|
flat_store = AgentFlatContextStore(user_id)
|
|
canonical_payload = {
|
|
"website_url": filtered_data.get("website_url") or incoming.get("website") or incoming.get("website_url"),
|
|
"analysis_date": datetime.utcnow().isoformat(),
|
|
"status": (nested or incoming).get("status") or "completed",
|
|
"error_message": (nested or incoming).get("error_message"),
|
|
"warning_message": (nested or incoming).get("warning_message"),
|
|
"writing_style": filtered_data.get("writing_style"),
|
|
"content_characteristics": filtered_data.get("content_characteristics"),
|
|
"target_audience": filtered_data.get("target_audience"),
|
|
"content_type": filtered_data.get("content_type"),
|
|
"recommended_settings": filtered_data.get("recommended_settings"),
|
|
"brand_analysis": filtered_data.get("brand_analysis"),
|
|
"content_strategy_insights": filtered_data.get("content_strategy_insights"),
|
|
"social_media_presence": filtered_data.get("social_media_presence"),
|
|
"style_patterns": filtered_data.get("style_patterns"),
|
|
"style_guidelines": filtered_data.get("style_guidelines"),
|
|
"seo_audit": filtered_data.get("seo_audit"),
|
|
"strategic_insights_history": (nested or incoming).get("strategic_insights_history"),
|
|
"crawl_result": filtered_data.get("crawl_result"),
|
|
"meta_info": meta_info,
|
|
"sitemap_analysis": sitemap_analysis,
|
|
"raw_step2_payload": incoming,
|
|
"raw_analysis_payload": nested or incoming,
|
|
"saved_at": datetime.utcnow().isoformat(),
|
|
}
|
|
flat_store.save_step2_website_analysis(canonical_payload, source="onboarding_step2")
|
|
except Exception as flat_err:
|
|
logger.warning(f"Failed to persist step 2 flat context for user {user_id}: {flat_err}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error saving website analysis for user {user_id}: {e}")
|
|
db.rollback()
|
|
raise e
|
|
|
|
def _save_research_preferences(self, user_id: str, research_data: Dict[str, Any], db: Session) -> bool:
|
|
"""Save research preferences directly to database."""
|
|
try:
|
|
session = self._get_or_create_session(user_id, db)
|
|
|
|
# Add defaults for required fields if missing to prevent 500 errors
|
|
# The frontend Step 3 (Competitor Analysis) might not send these
|
|
if 'research_depth' not in research_data:
|
|
research_data['research_depth'] = 'Comprehensive'
|
|
if 'content_types' not in research_data:
|
|
research_data['content_types'] = ["Blog Posts", "Social Media", "Newsletters"]
|
|
if 'auto_research' not in research_data:
|
|
research_data['auto_research'] = True
|
|
if 'factual_content' not in research_data:
|
|
research_data['factual_content'] = True
|
|
|
|
existing_prefs = db.query(ResearchPreferences).filter(
|
|
ResearchPreferences.session_id == session.id
|
|
).first()
|
|
|
|
if existing_prefs:
|
|
# Fix for SQLite DateTime issue: Ensure created_at is a datetime object
|
|
if hasattr(existing_prefs, 'created_at') and isinstance(existing_prefs.created_at, str):
|
|
try:
|
|
existing_prefs.created_at = datetime.fromisoformat(existing_prefs.created_at)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
for key, value in research_data.items():
|
|
# Skip metadata fields and id
|
|
if key in ['id', 'session_id', 'created_at', 'updated_at']:
|
|
continue
|
|
|
|
if hasattr(existing_prefs, key) and value is not None:
|
|
setattr(existing_prefs, key, value)
|
|
existing_prefs.updated_at = datetime.utcnow()
|
|
else:
|
|
# Filter valid columns only to avoid errors
|
|
valid_columns = [c.name for c in ResearchPreferences.__table__.columns if c.name not in ['id', 'session_id', 'created_at', 'updated_at']]
|
|
filtered_data = {k: v for k, v in research_data.items() if k in valid_columns}
|
|
|
|
new_prefs = ResearchPreferences(
|
|
session_id=session.id,
|
|
**filtered_data
|
|
)
|
|
db.add(new_prefs)
|
|
|
|
db.commit()
|
|
|
|
# Persist Step 3 snapshot to agent flat-file context
|
|
try:
|
|
flat_store = AgentFlatContextStore(user_id)
|
|
canonical_payload = {
|
|
"research_depth": research_data.get("research_depth"),
|
|
"content_types": research_data.get("content_types") or [],
|
|
"auto_research": research_data.get("auto_research", True),
|
|
"factual_content": research_data.get("factual_content", True),
|
|
"writing_style": research_data.get("writing_style") or {},
|
|
"content_characteristics": research_data.get("content_characteristics") or {},
|
|
"target_audience": research_data.get("target_audience") or {},
|
|
"recommended_settings": research_data.get("recommended_settings") or {},
|
|
"industry_context": research_data.get("industry_context") or research_data.get("industryContext"),
|
|
"competitors": research_data.get("competitors") if isinstance(research_data.get("competitors"), list) else [],
|
|
"saved_at": datetime.utcnow().isoformat(),
|
|
"source_payload": research_data,
|
|
}
|
|
flat_store.save_step3_research_preferences(canonical_payload, source="onboarding_step3")
|
|
except Exception as flat_err:
|
|
logger.warning(f"Failed to persist step 3 flat context for user {user_id}: {flat_err}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error saving research preferences for user {user_id}: {e}")
|
|
db.rollback()
|
|
raise e
|
|
|
|
def _save_competitor_analysis(self, user_id: str, competitors: List[Dict[str, Any]], industry_context: Optional[str], db: Session) -> bool:
|
|
"""Save competitor analysis results to database."""
|
|
try:
|
|
session = self._get_or_create_session(user_id, db)
|
|
|
|
logger.info(f"🔍 COMPETITOR SAVE: Starting to save {len(competitors)} competitors for session {session.id}")
|
|
|
|
saved_count = 0
|
|
failed_count = 0
|
|
|
|
for idx, competitor in enumerate(competitors):
|
|
try:
|
|
if not competitor or not isinstance(competitor, dict):
|
|
logger.warning(f" ⚠️ Skipping invalid competitor entry at index {idx}: {competitor}")
|
|
continue
|
|
|
|
# Use full URL (Text column supports it) and clean it
|
|
raw_url = competitor.get("url", "")
|
|
competitor_url = raw_url.strip().strip('`').strip() if raw_url else ""
|
|
|
|
# Prepare analysis data
|
|
analysis_data = {
|
|
"title": competitor.get("title", ""),
|
|
"summary": competitor.get("summary", ""),
|
|
"relevance_score": competitor.get("relevance_score", 0.5),
|
|
"highlights": competitor.get("highlights", []),
|
|
"subpages": competitor.get("subpages", []),
|
|
"favicon": competitor.get("favicon"),
|
|
"image": competitor.get("image"),
|
|
"published_date": competitor.get("published_date"),
|
|
"author": competitor.get("author"),
|
|
"competitive_analysis": competitor.get("competitive_analysis") or competitor.get("competitive_insights", {}),
|
|
"content_insights": competitor.get("content_insights", {}),
|
|
"industry_context": industry_context,
|
|
"completed_at": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
# Check if competitor already exists for this session
|
|
existing_competitor = db.query(CompetitorAnalysis).filter(
|
|
CompetitorAnalysis.session_id == session.id,
|
|
CompetitorAnalysis.competitor_url == competitor.get("url", "")
|
|
).first()
|
|
|
|
has_details = bool(analysis_data.get("summary") or analysis_data.get("highlights"))
|
|
detail_msg = "with rich details" if has_details else "basic info only"
|
|
|
|
if existing_competitor:
|
|
existing_competitor.analysis_data = analysis_data
|
|
existing_competitor.updated_at = datetime.utcnow()
|
|
logger.info(f" Updated existing competitor {idx + 1} ({detail_msg})")
|
|
else:
|
|
competitor_record = CompetitorAnalysis(
|
|
session_id=session.id,
|
|
competitor_url=competitor_url,
|
|
competitor_domain=competitor.get("domain", ""),
|
|
analysis_data=analysis_data,
|
|
status="completed"
|
|
)
|
|
db.add(competitor_record)
|
|
logger.info(f" Added new competitor {idx + 1} ({detail_msg})")
|
|
|
|
saved_count += 1
|
|
|
|
except Exception as e:
|
|
failed_count += 1
|
|
logger.error(f" ❌ Failed to save competitor {idx + 1}: {str(e)}")
|
|
|
|
db.commit()
|
|
logger.info(f"✅ Saved {saved_count} competitors ({failed_count} failed)")
|
|
|
|
# Refresh Step 3 flat context with competitor details saved by this flow
|
|
try:
|
|
flat_store = AgentFlatContextStore(user_id)
|
|
existing_doc = flat_store.load_step3_context_document() or {}
|
|
existing_data = existing_doc.get("data") if isinstance(existing_doc, dict) and isinstance(existing_doc.get("data"), dict) else {}
|
|
merged_payload = {
|
|
**existing_data,
|
|
"competitors": competitors,
|
|
"industry_context": industry_context or existing_data.get("industry_context"),
|
|
"competitors_saved_at": datetime.utcnow().isoformat(),
|
|
}
|
|
flat_store.save_step3_research_preferences(merged_payload, source="onboarding_step3_competitors")
|
|
except Exception as flat_err:
|
|
logger.warning(f"Failed to refresh step 3 competitor flat context for user {user_id}: {flat_err}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error saving competitor analysis for user {user_id}: {e}")
|
|
db.rollback()
|
|
raise e
|
|
|
|
|
|
|
|
def _save_step5_integrations_context(self, user_id: str, step5_data: Dict[str, Any]) -> bool:
|
|
"""Persist Step 5 integrations context to flat-file store."""
|
|
try:
|
|
flat_store = AgentFlatContextStore(user_id)
|
|
canonical_payload = {
|
|
"integrations": step5_data.get("integrations") if isinstance(step5_data.get("integrations"), dict) else {},
|
|
"providers": step5_data.get("providers") if isinstance(step5_data.get("providers"), list) else [],
|
|
"connected_accounts": step5_data.get("connectedAccounts") if isinstance(step5_data.get("connectedAccounts"), list) else [],
|
|
"integration_status": step5_data.get("status") or step5_data.get("integrationStatus"),
|
|
"notes": step5_data.get("notes") or step5_data.get("integrationNotes"),
|
|
"saved_at": datetime.utcnow().isoformat(),
|
|
"source_payload": step5_data,
|
|
}
|
|
return flat_store.save_step5_integrations(canonical_payload, source="onboarding_step5")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to save Step 5 integrations context for user {user_id}: {e}")
|
|
return False
|
|
|
|
def _save_persona_data(self, user_id: str, persona_data: Dict[str, Any], db: Session) -> bool:
|
|
"""Save persona data directly to database."""
|
|
try:
|
|
session = self._get_or_create_session(user_id, db)
|
|
|
|
existing = db.query(PersonaData).filter(
|
|
PersonaData.session_id == session.id
|
|
).first()
|
|
|
|
if existing:
|
|
existing.core_persona = persona_data.get('corePersona')
|
|
existing.platform_personas = persona_data.get('platformPersonas')
|
|
existing.quality_metrics = persona_data.get('qualityMetrics')
|
|
existing.selected_platforms = persona_data.get('selectedPlatforms', [])
|
|
existing.updated_at = datetime.utcnow()
|
|
else:
|
|
persona = PersonaData(
|
|
session_id=session.id,
|
|
core_persona=persona_data.get('corePersona'),
|
|
platform_personas=persona_data.get('platformPersonas'),
|
|
quality_metrics=persona_data.get('qualityMetrics'),
|
|
selected_platforms=persona_data.get('selectedPlatforms', [])
|
|
)
|
|
db.add(persona)
|
|
|
|
db.commit()
|
|
|
|
# Persist Step 4 snapshot to agent flat-file context
|
|
try:
|
|
flat_store = AgentFlatContextStore(user_id)
|
|
canonical_payload = {
|
|
"core_persona": persona_data.get("corePersona") or {},
|
|
"platform_personas": persona_data.get("platformPersonas") or {},
|
|
"quality_metrics": persona_data.get("qualityMetrics") or {},
|
|
"selected_platforms": persona_data.get("selectedPlatforms", []),
|
|
"research_persona": persona_data.get("researchPersona") or persona_data.get("research_persona"),
|
|
"persona_generation_notes": persona_data.get("personaGenerationNotes") or persona_data.get("persona_generation_notes"),
|
|
"saved_at": datetime.utcnow().isoformat(),
|
|
"source_payload": persona_data,
|
|
}
|
|
flat_store.save_step4_persona_data(canonical_payload, source="onboarding_step4")
|
|
except Exception as flat_err:
|
|
logger.warning(f"Failed to persist step 4 flat context for user {user_id}: {flat_err}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error saving persona data for user {user_id}: {e}")
|
|
db.rollback()
|
|
raise e
|
|
|
|
async def get_onboarding_status(self, current_user: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Get the current onboarding status (per user)."""
|
|
try:
|
|
from services.onboarding.progress_service import OnboardingProgressService
|
|
user_id = str(current_user.get('id'))
|
|
status = OnboardingProgressService().get_onboarding_status(user_id)
|
|
return {
|
|
"is_completed": status["is_completed"],
|
|
"current_step": status["current_step"],
|
|
"completion_percentage": status["completion_percentage"],
|
|
"next_step": 6 if status["is_completed"] else max(1, status["current_step"]),
|
|
"started_at": status["started_at"],
|
|
"completed_at": status["completed_at"],
|
|
"can_proceed_to_final": True if status["is_completed"] else status["current_step"] >= 5,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting onboarding status: {str(e)}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
async def get_onboarding_progress_full(self, current_user: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Get the full onboarding progress data."""
|
|
try:
|
|
from services.onboarding.progress_service import OnboardingProgressService
|
|
user_id = str(current_user.get('id'))
|
|
progress_service = OnboardingProgressService()
|
|
status = progress_service.get_onboarding_status(user_id)
|
|
data = progress_service.get_completion_data(user_id)
|
|
|
|
def completed(b: bool) -> str:
|
|
return 'completed' if b else 'pending'
|
|
|
|
api_keys = data.get('api_keys') or {}
|
|
website = data.get('website_analysis') or {}
|
|
research = data.get('research_preferences') or {}
|
|
persona = data.get('persona_data') or {}
|
|
|
|
steps = [
|
|
{
|
|
"step_number": 1,
|
|
"title": "API Keys",
|
|
"description": "Connect your AI services",
|
|
"status": completed(any(v for v in api_keys.values() if v)),
|
|
"completed_at": None,
|
|
"data": None,
|
|
"validation_errors": []
|
|
},
|
|
{
|
|
"step_number": 2,
|
|
"title": "Website",
|
|
"description": "Set up your website",
|
|
"status": completed(bool(website.get('website_url') or website.get('writing_style'))),
|
|
"completed_at": None,
|
|
"data": website or None,
|
|
"validation_errors": []
|
|
},
|
|
{
|
|
"step_number": 3,
|
|
"title": "Research",
|
|
"description": "Discover competitors",
|
|
"status": completed(bool(research.get('research_depth') or research.get('content_types'))),
|
|
"completed_at": None,
|
|
"data": research or None,
|
|
"validation_errors": []
|
|
},
|
|
{
|
|
"step_number": 4,
|
|
"title": "Personalization",
|
|
"description": "Customize your experience",
|
|
"status": completed(bool(persona.get('corePersona') or persona.get('platformPersonas'))),
|
|
"completed_at": None,
|
|
"data": persona or None,
|
|
"validation_errors": []
|
|
},
|
|
{
|
|
"step_number": 5,
|
|
"title": "Integrations",
|
|
"description": "Connect additional services",
|
|
"status": completed(status['current_step'] >= 5),
|
|
"completed_at": None,
|
|
"data": None,
|
|
"validation_errors": []
|
|
},
|
|
{
|
|
"step_number": 6,
|
|
"title": "Finish",
|
|
"description": "Complete setup",
|
|
"status": completed(status['is_completed']),
|
|
"completed_at": status['completed_at'],
|
|
"data": None,
|
|
"validation_errors": []
|
|
}
|
|
]
|
|
|
|
return {
|
|
"steps": steps,
|
|
"current_step": 6 if status['is_completed'] else status['current_step'],
|
|
"started_at": status['started_at'],
|
|
"last_updated": status['last_updated'],
|
|
"is_completed": status['is_completed'],
|
|
"completed_at": status['completed_at'],
|
|
"completion_percentage": status['completion_percentage']
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting onboarding progress: {str(e)}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
async def get_step_data(self, step_number: int, current_user: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Get data for a specific step."""
|
|
try:
|
|
user_id = str(current_user.get('clerk_user_id') or current_user.get('id'))
|
|
db = next(get_db(current_user))
|
|
|
|
# Use SSOT for reading step data
|
|
integrated_data = self.integration_service.get_integrated_data_sync(user_id, db)
|
|
|
|
if step_number == 2:
|
|
website = integrated_data.get('website_analysis', {})
|
|
return {
|
|
"step_number": 2,
|
|
"title": "Website",
|
|
"description": "Set up your website",
|
|
"status": 'completed' if (website.get('website_url') or website.get('writing_style')) else 'pending',
|
|
"completed_at": None,
|
|
"data": website,
|
|
"validation_errors": []
|
|
}
|
|
if step_number == 3:
|
|
research = integrated_data.get('research_preferences', {})
|
|
competitors = integrated_data.get('competitor_analysis', [])
|
|
website = integrated_data.get('website_analysis', {})
|
|
social_media = website.get('social_media_presence') or website.get('social_media_accounts', {})
|
|
|
|
# Merge competitors into the data
|
|
step_data = research.copy() if research else {}
|
|
step_data['competitors'] = competitors
|
|
step_data['social_media_accounts'] = social_media
|
|
|
|
return {
|
|
"step_number": 3,
|
|
"title": "Research",
|
|
"description": "Discover competitors",
|
|
"status": 'completed' if (research.get('research_depth') or research.get('content_types') or competitors) else 'pending',
|
|
"completed_at": None,
|
|
"data": step_data,
|
|
"validation_errors": []
|
|
}
|
|
if step_number == 4:
|
|
persona = integrated_data.get('persona_data', {})
|
|
return {
|
|
"step_number": 4,
|
|
"title": "Personalization",
|
|
"description": "Customize your experience",
|
|
"status": 'completed' if (persona.get('corePersona') or persona.get('platformPersonas')) else 'pending',
|
|
"completed_at": None,
|
|
"data": persona,
|
|
"validation_errors": []
|
|
}
|
|
|
|
from services.onboarding.progress_service import OnboardingProgressService
|
|
status = OnboardingProgressService().get_onboarding_status(user_id)
|
|
mapping = {
|
|
1: ('API Keys', 'Connect your AI services', status['current_step'] >= 1),
|
|
5: ('Integrations', 'Connect additional services', status['current_step'] >= 5),
|
|
6: ('Finish', 'Complete setup', status['is_completed'])
|
|
}
|
|
title, description, done = mapping.get(step_number, (f'Step {step_number}', 'Onboarding step', False))
|
|
return {
|
|
"step_number": step_number,
|
|
"title": title,
|
|
"description": description,
|
|
"status": 'completed' if done else 'pending',
|
|
"completed_at": status['completed_at'] if step_number == 6 and done else None,
|
|
"data": None,
|
|
"validation_errors": []
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting step data: {str(e)}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
async def complete_step(self, step_number: int, request_data: Dict[str, Any], current_user: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Mark a step as completed."""
|
|
try:
|
|
logger.info(f"[complete_step] Completing step {step_number}")
|
|
user_id = str(current_user.get('clerk_user_id') or current_user.get('id'))
|
|
|
|
# Optional validation
|
|
try:
|
|
from services.validation import validate_step_data
|
|
logger.info(f"[complete_step] Validating step {step_number} with data: {request_data}")
|
|
validation_errors = validate_step_data(step_number, request_data)
|
|
if validation_errors:
|
|
logger.warning(f"[complete_step] Step {step_number} validation failed: {validation_errors}")
|
|
raise HTTPException(status_code=400, detail=f"Step validation failed: {'; '.join(validation_errors)}")
|
|
except ImportError:
|
|
pass
|
|
|
|
db = next(get_db(current_user))
|
|
|
|
save_errors = [] # Track save failures
|
|
|
|
# Step-specific side effects: save data to DB
|
|
if step_number == 1 and request_data:
|
|
# Step 1: Save API keys
|
|
step_data = request_data.get('data') or request_data
|
|
logger.info(f"🔍 Step 1: Raw request_data keys: {list(request_data.keys()) if request_data else 'None'}")
|
|
logger.info(f"🔍 Step 1: Extracted step_data keys: {list(step_data.keys()) if step_data else 'None'}")
|
|
api_keys = step_data.get('api_keys', {})
|
|
logger.info(f"🔍 Step 1: API keys found: {list(api_keys.keys()) if api_keys else 'None'}")
|
|
if api_keys:
|
|
for provider, key in api_keys.items():
|
|
if key:
|
|
try:
|
|
saved = self._save_api_key(user_id, provider, key, db)
|
|
if saved:
|
|
logger.info(f"✅ Saved API key for provider {provider}")
|
|
except Exception as e:
|
|
logger.error(f"❌ BLOCKING ERROR: Failed to save API key for provider {provider}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to save API key for {provider}. Onboarding cannot proceed until this is resolved."
|
|
) from e
|
|
|
|
# Step 2: Save website analysis data
|
|
elif step_number == 2 and request_data:
|
|
website_data = request_data.get('data') or request_data
|
|
logger.info(f"🔍 Step 2: Raw request_data keys: {list(request_data.keys()) if request_data else 'None'}")
|
|
logger.info(f"🔍 Step 2: Extracted website_data keys: {list(website_data.keys()) if website_data else 'None'}")
|
|
if website_data:
|
|
try:
|
|
saved = self._save_website_analysis(user_id, website_data, db)
|
|
if saved:
|
|
logger.info(f"✅ Saved website analysis for user {user_id}")
|
|
|
|
# Trigger Advertools persona augmentation (Phase 1)
|
|
try:
|
|
from services.scheduler import get_scheduler
|
|
|
|
website_url = website_data.get('website') or website_data.get('website_url')
|
|
if website_url:
|
|
scheduler = get_scheduler()
|
|
# Schedule content audit for persona augmentation
|
|
scheduler.schedule_one_time_task(
|
|
func=scheduler.execute_task_by_type,
|
|
run_date=datetime.utcnow() + timedelta(seconds=10), # Start in 10s
|
|
job_id=f"advertools_persona_augmentation_{user_id}",
|
|
kwargs={
|
|
"task_type": "advertools_intelligence",
|
|
"user_id": user_id,
|
|
"payload": {
|
|
"type": "content_audit",
|
|
"website_url": website_url
|
|
}
|
|
}
|
|
)
|
|
logger.info(f"🚀 Triggered Advertools persona augmentation for {website_url}")
|
|
except Exception as sched_err:
|
|
logger.error(f"Failed to trigger Advertools augmentation: {sched_err}")
|
|
except Exception as e:
|
|
logger.error(f"❌ BLOCKING ERROR: Failed to save website analysis: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Failed to save website analysis data. Onboarding cannot proceed until this is resolved."
|
|
) from e
|
|
|
|
# Step 3: Save research preferences data
|
|
elif step_number == 3 and request_data:
|
|
research_data = request_data.get('data') or request_data
|
|
logger.info(f"🔍 Step 3: Raw request_data keys: {list(request_data.keys()) if request_data else 'None'}")
|
|
logger.info(f"🔍 Step 3: Extracted research_data keys: {list(research_data.keys()) if research_data else 'None'}")
|
|
if research_data:
|
|
try:
|
|
saved = self._save_research_preferences(user_id, research_data, db)
|
|
if saved:
|
|
logger.info(f"✅ Saved research preferences for user {user_id}")
|
|
|
|
# Also save competitors if present
|
|
competitors = research_data.get('competitors')
|
|
if competitors:
|
|
industry_context = research_data.get('industryContext') or research_data.get('industry_context')
|
|
logger.info(f"🔍 Step 3: Found {len(competitors)} competitors to save")
|
|
self._save_competitor_analysis(user_id, competitors, industry_context, db)
|
|
|
|
# Save social media presence if available (Update WebsiteAnalysis)
|
|
social_media = research_data.get('social_media_accounts')
|
|
if social_media:
|
|
logger.info(f"🔍 Step 3: Found social media accounts to save")
|
|
try:
|
|
session = self._get_or_create_session(user_id, db)
|
|
existing_analysis = db.query(WebsiteAnalysis).filter(
|
|
WebsiteAnalysis.session_id == session.id
|
|
).first()
|
|
if existing_analysis:
|
|
existing_analysis.social_media_presence = social_media
|
|
existing_analysis.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
logger.info(f"✅ Updated social media presence for user {user_id}")
|
|
else:
|
|
logger.warning(f"⚠️ Could not save social media: WebsiteAnalysis not found for user {user_id}")
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to save social media presence: {str(e)}")
|
|
# Don't block completion for this, as it's secondary data
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ BLOCKING ERROR: Failed to save research preferences: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Failed to save research preferences. Onboarding cannot proceed until this is resolved."
|
|
) from e
|
|
|
|
# Step 4: Save persona data
|
|
elif step_number == 4 and request_data:
|
|
persona_data = request_data.get('data') or request_data
|
|
logger.info(f"🔍 Step 4: Raw request_data keys: {list(request_data.keys()) if request_data else 'None'}")
|
|
logger.info(f"🔍 Step 4: Extracted persona_data keys: {list(persona_data.keys()) if persona_data else 'None'}")
|
|
if persona_data:
|
|
try:
|
|
saved = self._save_persona_data(user_id, persona_data, db)
|
|
if saved:
|
|
logger.info(f"✅ Saved persona data for user {user_id}")
|
|
except Exception as e:
|
|
logger.error(f"❌ BLOCKING ERROR: Failed to save persona data: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Failed to save persona data. Onboarding cannot proceed until this is resolved."
|
|
) from e
|
|
|
|
|
|
# Step 5: Save integrations data to flat context
|
|
elif step_number == 5 and request_data:
|
|
step5_data = request_data.get('data') or request_data
|
|
logger.info(f"🔍 Step 5: Raw request_data keys: {list(request_data.keys()) if request_data else 'None'}")
|
|
logger.info(f"🔍 Step 5: Extracted step5_data keys: {list(step5_data.keys()) if step5_data else 'None'}")
|
|
if step5_data:
|
|
saved = self._save_step5_integrations_context(user_id, step5_data)
|
|
if saved:
|
|
logger.info(f"✅ Saved Step 5 integrations context for user {user_id}")
|
|
else:
|
|
logger.warning(f"⚠️ Step 5 integrations context not persisted for user {user_id}")
|
|
|
|
# Persist current step and progress in DB
|
|
from services.onboarding.progress_service import OnboardingProgressService
|
|
progress_service = OnboardingProgressService()
|
|
progress_service.update_step(user_id, step_number)
|
|
try:
|
|
progress_pct = min(100.0, round((step_number / 6) * 100))
|
|
progress_service.update_progress(user_id, float(progress_pct))
|
|
except Exception as e:
|
|
logger.warning(f"Failed to update progress: {e}")
|
|
|
|
# Log save errors but don't block step completion (non-blocking)
|
|
if save_errors:
|
|
logger.warning(f"⚠️ Step {step_number} completed but some data save operations failed: {save_errors}")
|
|
|
|
# Refresh SSOT (Canonical Profile) - non-blocking try/except inside method
|
|
if not save_errors:
|
|
await self.integration_service.refresh_integrated_data(user_id, db)
|
|
|
|
logger.info(f"[complete_step] Step {step_number} persisted to DB for user {user_id}")
|
|
return {
|
|
"message": "Step completed successfully",
|
|
"step_number": step_number,
|
|
"data": request_data or {},
|
|
"warnings": save_errors if save_errors else None # Include warnings in response
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error completing step: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
async def skip_step(self, step_number: int, current_user: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Skip a step (for optional steps)."""
|
|
try:
|
|
from services.onboarding.api_key_manager import get_onboarding_progress_for_user
|
|
user_id = str(current_user.get('clerk_user_id') or current_user.get('id'))
|
|
progress = get_onboarding_progress_for_user(user_id)
|
|
step = progress.get_step_data(step_number)
|
|
|
|
if not step:
|
|
raise HTTPException(status_code=404, detail=f"Step {step_number} not found")
|
|
|
|
# Mark step as skipped
|
|
progress.mark_step_skipped(step_number)
|
|
|
|
return {
|
|
"message": f"Step {step_number} skipped successfully",
|
|
"step_number": step_number
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error skipping step: {str(e)}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
async def validate_step_access(self, step_number: int, current_user: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Validate if user can access a specific step."""
|
|
try:
|
|
user_id = str(current_user.get('clerk_user_id') or current_user.get('id'))
|
|
progress = get_onboarding_progress_for_user(user_id)
|
|
|
|
if not progress.can_proceed_to_step(step_number):
|
|
return {
|
|
"can_proceed": False,
|
|
"validation_errors": [f"Cannot proceed to step {step_number}. Complete previous steps first."],
|
|
"step_status": "locked"
|
|
}
|
|
|
|
return {
|
|
"can_proceed": True,
|
|
"validation_errors": [],
|
|
"step_status": "available"
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error validating step access: {str(e)}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|