""" SIF Phase 1 Integration: Onboarding Step 3 Enhancement Integrates semantic intelligence capabilities into ALwrity's onboarding step 3. This module enhances competitor discovery and content analysis with txtai-powered semantic understanding. """ import asyncio import json from typing import List, Dict, Any, Optional from loguru import logger from datetime import datetime # Import existing ALwrity services from api.onboarding_utils.step3_research_service import Step3ResearchService from services.research.exa_service import ExaService from services.seo.competitive_analyzer import CompetitiveAnalyzer # Import SIF framework from services.intelligence.txtai_service import TxtaiIntelligenceService from services.intelligence.harvester import SemanticHarvesterService from services.intelligence.agents import ( StrategyArchitectAgent, ContentGuardianAgent, LinkGraphAgent ) class SIFOnboardingIntegration: """ Phase 1: Semantic Intelligence Integration for Onboarding Step 3 Enhances competitor discovery and content analysis with semantic understanding. """ def __init__(self, user_id: str, db_session=None): self.user_id = user_id self.research_service = Step3ResearchService() self.exa_service = ExaService() # Optional database session for Phase 1 (can be added later) self.db_session = db_session if db_session: try: from services.seo.competitive_analyzer import CompetitiveAnalyzer self.competitive_analyzer = CompetitiveAnalyzer(db_session) except ImportError: logger.warning("[SIFOnboarding] CompetitiveAnalyzer not available, using fallback") self.competitive_analyzer = None else: self.competitive_analyzer = None # SIF components self.intelligence = TxtaiIntelligenceService(user_id) self.harvester = SemanticHarvesterService() # Initialize agents self.strategy_agent = StrategyArchitectAgent(self.intelligence, user_id) self.guardian_agent = ContentGuardianAgent(self.intelligence, user_id) self.link_agent = LinkGraphAgent(self.intelligence, user_id) logger.info(f"[SIFOnboarding] Initialized for user {user_id}") async def enhance_competitor_discovery(self, website_url: str, business_info: Dict[str, Any]) -> Dict[str, Any]: """ Enhanced competitor discovery with semantic intelligence. Args: website_url: User's website URL business_info: Business information from onboarding Returns: Enhanced competitor analysis with semantic insights """ logger.info(f"[SIFOnboarding] Starting enhanced competitor discovery for {website_url}") try: # Step 1: Harvest user website content for semantic analysis logger.info(f"[SIFOnboarding] Harvesting user website content from {website_url}") user_content = await self.harvester.harvest_website(website_url, limit=20) if not user_content: logger.warning(f"[SIFOnboarding] No content harvested from {website_url}") return await self._fallback_to_traditional_discovery(website_url, business_info) # Step 2: Index user content for semantic analysis logger.info(f"[SIFOnboarding] Indexing {len(user_content)} pages from user website") user_items = [ (page["url"], page["content"], { "title": page.get("title", ""), "type": "user_content", "source": "user_website" }) for page in user_content ] await self.intelligence.index_content(user_items) # Step 3: Traditional competitor discovery (existing ALwrity logic) logger.info("[SIFOnboarding] Running traditional competitor discovery") traditional_competitors = await self._get_traditional_competitors(website_url, business_info) # Step 4: Semantic competitor discovery using Exa AI logger.info("[SIFOnboarding] Running semantic competitor discovery") semantic_competitors = await self._discover_semantic_competitors(website_url, business_info) # Step 5: Harvest and analyze competitor content logger.info(f"[SIFOnboarding] Harvesting content from {len(semantic_competitors)} semantic competitors") competitor_content = await self.harvester.harvest_competitors( [comp["url"] for comp in semantic_competitors[:5]], pages_per_competitor=10 ) # Step 6: Index competitor content if competitor_content: logger.info(f"[SIFOnboarding] Indexing {len(competitor_content)} pages from competitors") competitor_items = [ (page["url"], page["content"], { "title": page.get("title", ""), "type": "competitor_content", "source": "competitor_website", "competitor_name": self._extract_domain(page["url"]) }) for page in competitor_content ] await self.intelligence.index_content(competitor_items) # Step 7: Generate semantic insights logger.info("[SIFOnboarding] Generating semantic insights") semantic_insights = await self._generate_semantic_insights(user_content, competitor_content) # Step 8: Combine traditional and semantic results enhanced_results = { "traditional_competitors": traditional_competitors, "semantic_competitors": semantic_competitors, "semantic_insights": semantic_insights, "content_analysis": { "user_pages_analyzed": len(user_content), "competitor_pages_analyzed": len(competitor_content), "harvest_stats": self.harvester.get_harvest_stats() }, "intelligence_status": self.intelligence.get_index_stats() } logger.success(f"[SIFOnboarding] Enhanced competitor discovery completed for user {self.user_id}") return enhanced_results except Exception as e: logger.error(f"[SIFOnboarding] Enhanced competitor discovery failed: {e}") logger.exception("Full traceback:") return await self._fallback_to_traditional_discovery(website_url, business_info) async def _generate_semantic_insights(self, user_content: List[Dict], competitor_content: List[Dict]) -> Dict[str, Any]: """Generate semantic insights using SIF agents.""" logger.info("[SIFOnboarding] Generating semantic insights") try: # Discover content pillars from user content content_pillars = await self.strategy_agent.discover_pillars() # Find semantic gaps (what competitors cover that user doesn't) indexed_documents = await self.strategy_agent._fetch_index_documents() competitor_doc_ids = [ str(doc.get("id", "")) for doc in indexed_documents if self.strategy_agent._infer_document_role(doc.get("metadata", {})) == "competitor" ] semantic_gaps = await self.strategy_agent.find_semantic_gaps(competitor_indices=competitor_doc_ids) # Analyze content themes and topics themes_analysis = await self._analyze_content_themes(indexed_documents) # Generate strategic recommendations recommendations = await self._generate_strategic_recommendations( content_pillars, semantic_gaps, themes_analysis ) return { "content_pillars": content_pillars, "semantic_gaps": semantic_gaps, "themes_analysis": themes_analysis, "strategic_recommendations": recommendations, "confidence_scores": { "pillar_discovery": len(content_pillars) > 0, "gap_analysis": len(semantic_gaps) > 0, "theme_analysis": themes_analysis is not None } } except Exception as e: logger.error(f"[SIFOnboarding] Semantic insights generation failed: {e}") return { "content_pillars": [], "semantic_gaps": [], "themes_analysis": None, "strategic_recommendations": [], "error": str(e) } async def _analyze_content_themes(self, indexed_documents: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """Analyze themes from indexed metadata instead of static literals.""" logger.info("[SIFOnboarding] Analyzing content themes") try: if not indexed_documents: return None user_docs = [ doc for doc in indexed_documents if self.strategy_agent._infer_document_role(doc.get("metadata", {})) == "user" ] competitor_docs = [ doc for doc in indexed_documents if self.strategy_agent._infer_document_role(doc.get("metadata", {})) == "competitor" ] if not user_docs and not competitor_docs: return None user_theme_density = self.strategy_agent._extract_topic_density(user_docs) competitor_theme_density = self.strategy_agent._extract_topic_density(competitor_docs) all_topics = set(user_theme_density) | set(competitor_theme_density) ranked_themes = [] for topic in all_topics: user_score = user_theme_density.get(topic, 0.0) competitor_score = competitor_theme_density.get(topic, 0.0) ranked_themes.append({ "theme": topic, "user_density": round(user_score, 4), "competitor_density": round(competitor_score, 4), "combined_relevance": round((user_score + competitor_score) / 2, 4), "coverage_delta": round(competitor_score - user_score, 4), "classification": ( "competitor_led" if competitor_score > user_score + 0.05 else "user_led" if user_score > competitor_score + 0.05 else "shared" ), "evidence": { "user_sample_titles": self.strategy_agent._sample_titles_for_topic(user_docs, topic), "competitor_sample_titles": self.strategy_agent._sample_titles_for_topic(competitor_docs, topic) } }) ranked_themes.sort( key=lambda item: (item["combined_relevance"], abs(item["coverage_delta"])), reverse=True ) return { "top_themes": ranked_themes[:8], "total_themes_analyzed": len(ranked_themes), "user_theme_count": len(user_theme_density), "competitor_theme_count": len(competitor_theme_density), "theme_source": "indexed_metadata" } except Exception as e: logger.error(f"[SIFOnboarding] Theme analysis failed: {e}") return None async def _generate_strategic_recommendations(self, content_pillars: List[Dict], semantic_gaps: List[Dict], themes_analysis: Optional[Dict]) -> List[Dict[str, Any]]: """Generate strategic recommendations based on semantic analysis.""" logger.info("[SIFOnboarding] Generating strategic recommendations") recommendations = [] try: # Content pillar recommendations if content_pillars: recommendations.append({ "type": "content_pillars", "priority": "high", "title": "Focus on Core Content Pillars", "description": f"Based on semantic analysis, focus on {len(content_pillars)} key content areas.", "action_items": [f"Develop comprehensive content for pillar: {pillar.get('pillar_id', 'Unknown')}" for pillar in content_pillars[:3]] }) # Semantic gap recommendations if semantic_gaps: recommendations.append({ "type": "content_gaps", "priority": "high", "title": "Fill Content Gaps", "description": f"Competitors are covering {len(semantic_gaps)} topics you haven't addressed.", "action_items": [ ( f"Create content about: {gap.get('topic', 'Unknown topic')} " f"({gap.get('priority', 'medium')} priority) - {gap.get('reason', 'Coverage gap identified')}" ) for gap in semantic_gaps[:5] ], "evidence": [ { "topic": gap.get("topic"), "priority": gap.get("priority"), "confidence": gap.get("confidence"), "topic_density": gap.get("topic_density"), "competitor_sample_titles": gap.get("evidence", {}).get("competitor_sample_titles", []) } for gap in semantic_gaps[:5] ] }) # Theme-based recommendations if themes_analysis and themes_analysis.get("top_themes"): top_theme = themes_analysis["top_themes"][0] if themes_analysis["top_themes"] else None if top_theme: recommendations.append({ "type": "content_themes", "priority": "medium", "title": "Leverage High-Relevance Themes", "description": f"Your content strongly relates to '{top_theme['theme']}' - consider expanding in this area.", "action_items": [ f"Create in-depth guides about {top_theme['theme']}", f"Develop case studies showing {top_theme['theme']} success", f"Create comparison content for {top_theme['theme']} tools/approaches" ] }) # General strategic recommendations recommendations.append({ "type": "strategic_overview", "priority": "medium", "title": "Strategic Content Approach", "description": "Based on semantic analysis of your competitive landscape", "action_items": [ "Focus on unique angles within your content pillars", "Address identified content gaps systematically", "Monitor competitor content themes for emerging opportunities", "Develop thought leadership in your strongest semantic areas" ] }) return recommendations except Exception as e: logger.error(f"[SIFOnboarding] Strategic recommendations generation failed: {e}") return [{ "type": "error", "priority": "low", "title": "Analysis Error", "description": "Unable to generate strategic recommendations due to analysis error", "action_items": ["Retry analysis with different parameters"] }] async def _get_traditional_competitors(self, website_url: str, business_info: Dict[str, Any]) -> List[Dict[str, Any]]: """Get traditional competitors using existing ALwrity logic.""" try: # Use existing Step3ResearchService for traditional competitor discovery # Note: This will use the existing ALwrity logic without database dependency for Phase 1 return await self.research_service.discover_competitors(website_url, business_info) except Exception as e: logger.error(f"[SIFOnboarding] Traditional competitor discovery failed: {e}") # Fallback: return sample competitors for testing return [ { "name": "Sample Competitor 1", "url": "https://sample-competitor-1.com", "description": "Traditional competitor discovered via ALwrity", "discovery_method": "traditional" }, { "name": "Sample Competitor 2", "url": "https://sample-competitor-2.com", "description": "Traditional competitor discovered via ALwrity", "discovery_method": "traditional" } ] async def _discover_semantic_competitors(self, website_url: str, business_info: Dict[str, Any]) -> List[Dict[str, Any]]: """Discover semantic competitors using Exa AI neural search.""" try: # Use Exa API for semantic competitor discovery business_description = business_info.get("description", "") industry = business_info.get("industry", "") # Create semantic search query semantic_query = f"{business_description} {industry} competitors alternatives" # Search for semantically similar businesses exa_results = await self.exa_service.search_and_contents( semantic_query, num_results=10, exclude_domains=[self._extract_domain(website_url)] ) # Format results as competitors semantic_competitors = [] for result in exa_results.get("results", []): competitor = { "name": result.get("title", "Unknown Competitor"), "url": result.get("url", ""), "description": result.get("snippet", ""), "discovery_method": "semantic_search", "relevance_score": result.get("score", 0.0), "semantic_match": True } semantic_competitors.append(competitor) return semantic_competitors except Exception as e: logger.error(f"[SIFOnboarding] Semantic competitor discovery failed: {e}") return [] async def _fallback_to_traditional_discovery(self, website_url: str, business_info: Dict[str, Any]) -> Dict[str, Any]: """Fallback to traditional competitor discovery when SIF fails.""" logger.warning(f"[SIFOnboarding] Falling back to traditional discovery for {website_url}") traditional_competitors = await self._get_traditional_competitors(website_url, business_info) return { "traditional_competitors": traditional_competitors, "semantic_competitors": [], "semantic_insights": { "error": "Semantic analysis temporarily unavailable", "fallback_used": True }, "content_analysis": { "user_pages_analyzed": 0, "competitor_pages_analyzed": 0, "error": "Content harvesting failed" }, "intelligence_status": {"status": "error", "error": "SIF initialization failed"} } def _extract_domain(self, url: str) -> str: """Extract domain from URL.""" from urllib.parse import urlparse try: return urlparse(url).netloc except Exception: return url # Integration helper functions for existing ALwrity code def create_sif_enhanced_step3(user_id: str, db_session=None) -> SIFOnboardingIntegration: """ Factory function to create SIF-enhanced Step 3 integration. Args: user_id: The user ID for the onboarding session db_session: Optional database session for enhanced functionality Returns: Configured SIFOnboardingIntegration instance """ return SIFOnboardingIntegration(user_id, db_session) async def enhance_step3_with_semantic_intelligence( user_id: str, website_url: str, business_info: Dict[str, Any], db_session=None ) -> Dict[str, Any]: """ Convenience function to enhance Step 3 with semantic intelligence. Args: user_id: User ID website_url: User's website URL business_info: Business information from onboarding db_session: Optional database session for enhanced functionality Returns: Enhanced competitor analysis results """ sif_integration = create_sif_enhanced_step3(user_id, db_session) return await sif_integration.enhance_competitor_discovery(website_url, business_info) # Example usage for integration into existing Step 3 API """ # In step3_routes.py, enhance the existing competitor discovery endpoint: from services.intelligence.sif_onboarding_integration import enhance_step3_with_semantic_intelligence @app.post("/api/onboarding/step3/discover-competitors") async def discover_competitors(request: CompetitorDiscoveryRequest, user=Depends(get_current_user)): # Existing traditional competitor discovery traditional_results = await step3_research_service.discover_competitors( request.website_url, request.business_info ) # New: Enhanced with semantic intelligence enhanced_results = await enhance_step3_with_semantic_intelligence( user.id, request.website_url, request.business_info ) # Combine results return { "traditional_competitors": traditional_results, "semantic_insights": enhanced_results["semantic_insights"], "content_analysis": enhanced_results["content_analysis"], "strategic_recommendations": enhanced_results["semantic_insights"]["strategic_recommendations"] } """