From a19a18d9b4abc06fefde4f46840c5ef14c975b59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Mon, 9 Mar 2026 12:07:18 +0530 Subject: [PATCH] Add competitor_analysis fallback for deep competitor task scheduling --- .../onboarding_completion_service.py | 105 ++++++++- ...egression_onboarding_completion_service.py | 220 ++++++++++++++++++ 2 files changed, 318 insertions(+), 7 deletions(-) create mode 100644 backend/regression_onboarding_completion_service.py diff --git a/backend/api/onboarding_utils/onboarding_completion_service.py b/backend/api/onboarding_utils/onboarding_completion_service.py index 2e479889..c1c37684 100644 --- a/backend/api/onboarding_utils/onboarding_completion_service.py +++ b/backend/api/onboarding_utils/onboarding_completion_service.py @@ -6,6 +6,7 @@ Handles the complex logic for completing the onboarding process. from typing import Dict, Any, List from datetime import datetime, timedelta import os +from urllib.parse import urlparse from fastapi import HTTPException from loguru import logger @@ -21,6 +22,84 @@ class OnboardingCompletionService: def __init__(self): # Pre-requisite steps; step 6 is the finalization itself self.required_steps = [1, 2, 3, 4, 5] + + def _normalize_competitor_analysis_for_deep_task(self, competitors: Any) -> List[Dict[str, Any]]: + """Normalize Step 3 competitor analysis records to deep-task competitor schema.""" + if not isinstance(competitors, list): + return [] + + normalized: List[Dict[str, Any]] = [] + seen_domains = set() + + for competitor in competitors: + if isinstance(competitor, str): + raw_url = competitor + raw_domain = "" + name = "" + summary = "" + elif isinstance(competitor, dict): + raw_url = ( + competitor.get("competitor_url") + or competitor.get("url") + or competitor.get("website_url") + or competitor.get("competitor_domain") + or competitor.get("domain") + or "" + ) + raw_domain = competitor.get("competitor_domain") or competitor.get("domain") or "" + name = competitor.get("name") or competitor.get("title") or "" + summary = competitor.get("summary") or competitor.get("description") or "" + + analysis_data = competitor.get("analysis_data") + if isinstance(analysis_data, dict): + name = name or analysis_data.get("name") or analysis_data.get("title") or "" + summary = summary or analysis_data.get("summary") or analysis_data.get("description") or "" + else: + continue + + url = self._normalize_competitor_url(raw_url) + if not url: + url = self._normalize_competitor_url(raw_domain) + if not url: + continue + + domain = self._extract_domain_from_url(url) + if not domain or domain in seen_domains: + continue + + seen_domains.add(domain) + normalized.append({ + "url": url, + "domain": domain, + "name": name or domain, + "summary": summary, + }) + + return normalized + + def _normalize_competitor_url(self, raw: Any) -> str: + if not isinstance(raw, str): + return "" + + value = raw.strip() + if not value: + return "" + + if not value.startswith(("http://", "https://")): + value = f"https://{value}" + + parsed = urlparse(value) + if not parsed.scheme or not parsed.netloc: + return "" + + return f"{parsed.scheme}://{parsed.netloc}" + + def _extract_domain_from_url(self, url: str) -> str: + parsed = urlparse(url) + domain = (parsed.netloc or "").lower() + if domain.startswith("www."): + domain = domain[4:] + return domain async def complete_onboarding(self, current_user: Dict[str, Any]) -> Dict[str, Any]: """Complete the onboarding process with full validation.""" @@ -233,11 +312,22 @@ class OnboardingCompletionService: try: research_prefs = integrated_data.get("research_preferences", {}) if isinstance(integrated_data, dict) else {} - competitors = research_prefs.get("competitors") if isinstance(research_prefs, dict) else None - - # Fallback: Check competitor_analysis (Step 3 persistence) if not in preferences - if not competitors or not isinstance(competitors, list) or len(competitors) == 0: - competitors = integrated_data.get("competitor_analysis") if isinstance(integrated_data, dict) else None + research_competitors = research_prefs.get("competitors") if isinstance(research_prefs, dict) else None + + competitor_analysis = integrated_data.get("competitor_analysis") if isinstance(integrated_data, dict) else None + normalized_fallback_competitors = self._normalize_competitor_analysis_for_deep_task(competitor_analysis) + + selected_source = "research_preferences" + competitors = research_competitors + if not isinstance(competitors, list) or len(competitors) == 0: + competitors = normalized_fallback_competitors + selected_source = "competitor_analysis" + + logger.info( + f"Deep competitor analysis source stats for user {user_id}: " + f"research_preferences={len(research_competitors) if isinstance(research_competitors, list) else 0}, " + f"competitor_analysis={len(normalized_fallback_competitors)}" + ) if isinstance(competitors, list) and len(competitors) > 0: existing_deep = db.query(DeepCompetitorAnalysisTask).filter( @@ -272,12 +362,13 @@ class OnboardingCompletionService: db.commit() logger.info( f"Scheduled deep competitor analysis for user {user_id} " - f"({website_url}) at {next_execution.isoformat()} with {len(competitors)} competitors" + f"({website_url}) at {next_execution.isoformat()} with {len(competitors)} competitors " + f"from source={selected_source}" ) else: logger.warning( f"Deep competitor analysis not scheduled for user {user_id}: " - f"no Step 3 competitors available" + f"no competitors available from research_preferences or competitor_analysis" ) except Exception as e: logger.warning(f"Failed to schedule deep competitor analysis for user {user_id}: {e}") diff --git a/backend/regression_onboarding_completion_service.py b/backend/regression_onboarding_completion_service.py new file mode 100644 index 00000000..916a7682 --- /dev/null +++ b/backend/regression_onboarding_completion_service.py @@ -0,0 +1,220 @@ +from pathlib import Path +import importlib.util +import sys +import types + +import pytest + + +class _FakeQuery: + def filter(self, *args, **kwargs): + return self + + def order_by(self, *args, **kwargs): + return self + + def first(self): + return None + + def all(self): + return [] + + +class _FakeDB: + def __init__(self): + self.added = [] + + def query(self, model): + return _FakeQuery() + + def add(self, obj): + self.added.append(obj) + + def commit(self): + return None + + def close(self): + return None + + +class _TaskBase: + def __init__(self, **kwargs): + for key, value in kwargs.items(): + setattr(self, key, value) + + +class _OnboardingFullWebsiteAnalysisTask(_TaskBase): + user_id = "user_id" + website_url = "website_url" + + +class _DeepCompetitorAnalysisTask(_TaskBase): + user_id = "user_id" + website_url = "website_url" + + +class _SIFIndexingTask(_TaskBase): + user_id = "user_id" + website_url = "website_url" + + +class _MarketTrendsTask(_TaskBase): + user_id = "user_id" + website_url = "website_url" + + +class _FakeIntegrationService: + def __init__(self, integrated_data): + self._integrated_data = integrated_data + + def get_integrated_data_sync(self, user_id, db): + return self._integrated_data + + +def _install_stub_module(name, **attrs): + mod = types.ModuleType(name) + for key, value in attrs.items(): + setattr(mod, key, value) + sys.modules[name] = mod + return mod + + +def _install_module_chain(name): + parts = name.split('.') + for idx in range(1, len(parts) + 1): + sub = '.'.join(parts[:idx]) + if sub not in sys.modules: + sys.modules[sub] = types.ModuleType(sub) + + +def _load_service_module(fake_db, integrated_data): + _install_module_chain("api.content_planning.services.content_strategy.onboarding") + _install_module_chain("services.database") + _install_module_chain("services.persona_analysis_service") + _install_module_chain("services.research.research_persona_scheduler") + _install_module_chain("services.persona.facebook.facebook_persona_scheduler") + _install_module_chain("services.onboarding.progress_service") + _install_module_chain("services.progressive_setup_service") + _install_module_chain("services.website_analysis_monitoring_service") + _install_module_chain("models.website_analysis_monitoring_models") + + _install_stub_module( + "api.content_planning.services.content_strategy.onboarding", + OnboardingDataIntegrationService=lambda: _FakeIntegrationService(integrated_data), + ) + + _install_stub_module( + "services.database", + get_session_for_user=lambda user_id: fake_db, + SessionLocal=lambda: fake_db, + ) + + _install_stub_module( + "services.persona_analysis_service", + PersonaAnalysisService=type("PersonaAnalysisService", (), {}), + ) + + _install_stub_module( + "services.research.research_persona_scheduler", + schedule_research_persona_generation=lambda *args, **kwargs: None, + ) + + _install_stub_module( + "services.persona.facebook.facebook_persona_scheduler", + schedule_facebook_persona_generation=lambda *args, **kwargs: None, + ) + + _install_stub_module( + "services.onboarding.progress_service", + OnboardingProgressService=type( + "OnboardingProgressService", + (), + { + "complete_onboarding": lambda self, user_id: True, + "get_onboarding_status": lambda self, user_id: {"current_step": 5}, + }, + ), + ) + + _install_stub_module( + "services.progressive_setup_service", + ProgressiveSetupService=type( + "ProgressiveSetupService", + (), + { + "__init__": lambda self, db: None, + "initialize_user_environment": lambda self, user_id: None, + }, + ), + ) + + _install_stub_module( + "services.website_analysis_monitoring_service", + schedule_website_analysis_task_creation=lambda **kwargs: None, + clerk_user_id_to_int=lambda user_id: 1, + ) + + _install_stub_module( + "models.website_analysis_monitoring_models", + OnboardingFullWebsiteAnalysisTask=_OnboardingFullWebsiteAnalysisTask, + DeepCompetitorAnalysisTask=_DeepCompetitorAnalysisTask, + SIFIndexingTask=_SIFIndexingTask, + MarketTrendsTask=_MarketTrendsTask, + ) + + service_path = Path(__file__).resolve().parent / "api" / "onboarding_utils" / "onboarding_completion_service.py" + spec = importlib.util.spec_from_file_location("onboarding_completion_service_under_test", service_path) + module = importlib.util.module_from_spec(spec) + assert spec and spec.loader + spec.loader.exec_module(module) + return module + + +@pytest.mark.asyncio +async def test_complete_onboarding_schedules_deep_competitor_task_from_competitor_analysis_fallback(monkeypatch): + fake_db = _FakeDB() + integrated_data = { + "website_analysis": { + "website_url": "https://example.com", + "updated_at": "2026-01-01T00:00:00", + }, + "research_preferences": {"competitors": []}, + "competitor_analysis": [ + { + "competitor_url": "acme-competitor.com/path/page", + "competitor_domain": "acme-competitor.com", + "analysis_data": {"description": "Strong content engine"}, + } + ], + } + + module = _load_service_module(fake_db, integrated_data) + service = module.OnboardingCompletionService() + + async def _validate_steps(*args, **kwargs): + return [] + + async def _validate_api_keys(*args, **kwargs): + return None + + async def _generate_persona(*args, **kwargs): + return False + + monkeypatch.setattr(module.OnboardingCompletionService, "_validate_required_steps_database", _validate_steps) + monkeypatch.setattr(module.OnboardingCompletionService, "_validate_api_keys", _validate_api_keys) + monkeypatch.setattr(module.OnboardingCompletionService, "_generate_persona_from_onboarding", _generate_persona) + + result = await service.complete_onboarding({"id": "user-1"}) + + assert result["message"] == "Onboarding completed successfully" + + deep_tasks = [obj for obj in fake_db.added if isinstance(obj, _DeepCompetitorAnalysisTask)] + assert len(deep_tasks) == 1 + assert deep_tasks[0].payload["competitors"] == [ + { + "url": "https://acme-competitor.com", + "domain": "acme-competitor.com", + "name": "acme-competitor.com", + "summary": "Strong content engine", + } + ]