Add competitor_analysis fallback for deep competitor task scheduling

This commit is contained in:
ي
2026-03-09 12:07:18 +05:30
parent 5d49351c2d
commit a19a18d9b4
2 changed files with 318 additions and 7 deletions

View File

@@ -6,6 +6,7 @@ Handles the complex logic for completing the onboarding process.
from typing import Dict, Any, List
from datetime import datetime, timedelta
import os
from urllib.parse import urlparse
from fastapi import HTTPException
from loguru import logger
@@ -21,6 +22,84 @@ class OnboardingCompletionService:
def __init__(self):
# Pre-requisite steps; step 6 is the finalization itself
self.required_steps = [1, 2, 3, 4, 5]
def _normalize_competitor_analysis_for_deep_task(self, competitors: Any) -> List[Dict[str, Any]]:
"""Normalize Step 3 competitor analysis records to deep-task competitor schema."""
if not isinstance(competitors, list):
return []
normalized: List[Dict[str, Any]] = []
seen_domains = set()
for competitor in competitors:
if isinstance(competitor, str):
raw_url = competitor
raw_domain = ""
name = ""
summary = ""
elif isinstance(competitor, dict):
raw_url = (
competitor.get("competitor_url")
or competitor.get("url")
or competitor.get("website_url")
or competitor.get("competitor_domain")
or competitor.get("domain")
or ""
)
raw_domain = competitor.get("competitor_domain") or competitor.get("domain") or ""
name = competitor.get("name") or competitor.get("title") or ""
summary = competitor.get("summary") or competitor.get("description") or ""
analysis_data = competitor.get("analysis_data")
if isinstance(analysis_data, dict):
name = name or analysis_data.get("name") or analysis_data.get("title") or ""
summary = summary or analysis_data.get("summary") or analysis_data.get("description") or ""
else:
continue
url = self._normalize_competitor_url(raw_url)
if not url:
url = self._normalize_competitor_url(raw_domain)
if not url:
continue
domain = self._extract_domain_from_url(url)
if not domain or domain in seen_domains:
continue
seen_domains.add(domain)
normalized.append({
"url": url,
"domain": domain,
"name": name or domain,
"summary": summary,
})
return normalized
def _normalize_competitor_url(self, raw: Any) -> str:
if not isinstance(raw, str):
return ""
value = raw.strip()
if not value:
return ""
if not value.startswith(("http://", "https://")):
value = f"https://{value}"
parsed = urlparse(value)
if not parsed.scheme or not parsed.netloc:
return ""
return f"{parsed.scheme}://{parsed.netloc}"
def _extract_domain_from_url(self, url: str) -> str:
parsed = urlparse(url)
domain = (parsed.netloc or "").lower()
if domain.startswith("www."):
domain = domain[4:]
return domain
async def complete_onboarding(self, current_user: Dict[str, Any]) -> Dict[str, Any]:
"""Complete the onboarding process with full validation."""
@@ -233,11 +312,22 @@ class OnboardingCompletionService:
try:
research_prefs = integrated_data.get("research_preferences", {}) if isinstance(integrated_data, dict) else {}
competitors = research_prefs.get("competitors") if isinstance(research_prefs, dict) else None
# Fallback: Check competitor_analysis (Step 3 persistence) if not in preferences
if not competitors or not isinstance(competitors, list) or len(competitors) == 0:
competitors = integrated_data.get("competitor_analysis") if isinstance(integrated_data, dict) else None
research_competitors = research_prefs.get("competitors") if isinstance(research_prefs, dict) else None
competitor_analysis = integrated_data.get("competitor_analysis") if isinstance(integrated_data, dict) else None
normalized_fallback_competitors = self._normalize_competitor_analysis_for_deep_task(competitor_analysis)
selected_source = "research_preferences"
competitors = research_competitors
if not isinstance(competitors, list) or len(competitors) == 0:
competitors = normalized_fallback_competitors
selected_source = "competitor_analysis"
logger.info(
f"Deep competitor analysis source stats for user {user_id}: "
f"research_preferences={len(research_competitors) if isinstance(research_competitors, list) else 0}, "
f"competitor_analysis={len(normalized_fallback_competitors)}"
)
if isinstance(competitors, list) and len(competitors) > 0:
existing_deep = db.query(DeepCompetitorAnalysisTask).filter(
@@ -272,12 +362,13 @@ class OnboardingCompletionService:
db.commit()
logger.info(
f"Scheduled deep competitor analysis for user {user_id} "
f"({website_url}) at {next_execution.isoformat()} with {len(competitors)} competitors"
f"({website_url}) at {next_execution.isoformat()} with {len(competitors)} competitors "
f"from source={selected_source}"
)
else:
logger.warning(
f"Deep competitor analysis not scheduled for user {user_id}: "
f"no Step 3 competitors available"
f"no competitors available from research_preferences or competitor_analysis"
)
except Exception as e:
logger.warning(f"Failed to schedule deep competitor analysis for user {user_id}: {e}")

View File

@@ -0,0 +1,220 @@
from pathlib import Path
import importlib.util
import sys
import types
import pytest
class _FakeQuery:
def filter(self, *args, **kwargs):
return self
def order_by(self, *args, **kwargs):
return self
def first(self):
return None
def all(self):
return []
class _FakeDB:
def __init__(self):
self.added = []
def query(self, model):
return _FakeQuery()
def add(self, obj):
self.added.append(obj)
def commit(self):
return None
def close(self):
return None
class _TaskBase:
def __init__(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
class _OnboardingFullWebsiteAnalysisTask(_TaskBase):
user_id = "user_id"
website_url = "website_url"
class _DeepCompetitorAnalysisTask(_TaskBase):
user_id = "user_id"
website_url = "website_url"
class _SIFIndexingTask(_TaskBase):
user_id = "user_id"
website_url = "website_url"
class _MarketTrendsTask(_TaskBase):
user_id = "user_id"
website_url = "website_url"
class _FakeIntegrationService:
def __init__(self, integrated_data):
self._integrated_data = integrated_data
def get_integrated_data_sync(self, user_id, db):
return self._integrated_data
def _install_stub_module(name, **attrs):
mod = types.ModuleType(name)
for key, value in attrs.items():
setattr(mod, key, value)
sys.modules[name] = mod
return mod
def _install_module_chain(name):
parts = name.split('.')
for idx in range(1, len(parts) + 1):
sub = '.'.join(parts[:idx])
if sub not in sys.modules:
sys.modules[sub] = types.ModuleType(sub)
def _load_service_module(fake_db, integrated_data):
_install_module_chain("api.content_planning.services.content_strategy.onboarding")
_install_module_chain("services.database")
_install_module_chain("services.persona_analysis_service")
_install_module_chain("services.research.research_persona_scheduler")
_install_module_chain("services.persona.facebook.facebook_persona_scheduler")
_install_module_chain("services.onboarding.progress_service")
_install_module_chain("services.progressive_setup_service")
_install_module_chain("services.website_analysis_monitoring_service")
_install_module_chain("models.website_analysis_monitoring_models")
_install_stub_module(
"api.content_planning.services.content_strategy.onboarding",
OnboardingDataIntegrationService=lambda: _FakeIntegrationService(integrated_data),
)
_install_stub_module(
"services.database",
get_session_for_user=lambda user_id: fake_db,
SessionLocal=lambda: fake_db,
)
_install_stub_module(
"services.persona_analysis_service",
PersonaAnalysisService=type("PersonaAnalysisService", (), {}),
)
_install_stub_module(
"services.research.research_persona_scheduler",
schedule_research_persona_generation=lambda *args, **kwargs: None,
)
_install_stub_module(
"services.persona.facebook.facebook_persona_scheduler",
schedule_facebook_persona_generation=lambda *args, **kwargs: None,
)
_install_stub_module(
"services.onboarding.progress_service",
OnboardingProgressService=type(
"OnboardingProgressService",
(),
{
"complete_onboarding": lambda self, user_id: True,
"get_onboarding_status": lambda self, user_id: {"current_step": 5},
},
),
)
_install_stub_module(
"services.progressive_setup_service",
ProgressiveSetupService=type(
"ProgressiveSetupService",
(),
{
"__init__": lambda self, db: None,
"initialize_user_environment": lambda self, user_id: None,
},
),
)
_install_stub_module(
"services.website_analysis_monitoring_service",
schedule_website_analysis_task_creation=lambda **kwargs: None,
clerk_user_id_to_int=lambda user_id: 1,
)
_install_stub_module(
"models.website_analysis_monitoring_models",
OnboardingFullWebsiteAnalysisTask=_OnboardingFullWebsiteAnalysisTask,
DeepCompetitorAnalysisTask=_DeepCompetitorAnalysisTask,
SIFIndexingTask=_SIFIndexingTask,
MarketTrendsTask=_MarketTrendsTask,
)
service_path = Path(__file__).resolve().parent / "api" / "onboarding_utils" / "onboarding_completion_service.py"
spec = importlib.util.spec_from_file_location("onboarding_completion_service_under_test", service_path)
module = importlib.util.module_from_spec(spec)
assert spec and spec.loader
spec.loader.exec_module(module)
return module
@pytest.mark.asyncio
async def test_complete_onboarding_schedules_deep_competitor_task_from_competitor_analysis_fallback(monkeypatch):
fake_db = _FakeDB()
integrated_data = {
"website_analysis": {
"website_url": "https://example.com",
"updated_at": "2026-01-01T00:00:00",
},
"research_preferences": {"competitors": []},
"competitor_analysis": [
{
"competitor_url": "acme-competitor.com/path/page",
"competitor_domain": "acme-competitor.com",
"analysis_data": {"description": "Strong content engine"},
}
],
}
module = _load_service_module(fake_db, integrated_data)
service = module.OnboardingCompletionService()
async def _validate_steps(*args, **kwargs):
return []
async def _validate_api_keys(*args, **kwargs):
return None
async def _generate_persona(*args, **kwargs):
return False
monkeypatch.setattr(module.OnboardingCompletionService, "_validate_required_steps_database", _validate_steps)
monkeypatch.setattr(module.OnboardingCompletionService, "_validate_api_keys", _validate_api_keys)
monkeypatch.setattr(module.OnboardingCompletionService, "_generate_persona_from_onboarding", _generate_persona)
result = await service.complete_onboarding({"id": "user-1"})
assert result["message"] == "Onboarding completed successfully"
deep_tasks = [obj for obj in fake_db.added if isinstance(obj, _DeepCompetitorAnalysisTask)]
assert len(deep_tasks) == 1
assert deep_tasks[0].payload["competitors"] == [
{
"url": "https://acme-competitor.com",
"domain": "acme-competitor.com",
"name": "acme-competitor.com",
"summary": "Strong content engine",
}
]