From 77088bfc53ff728a9c489ef4d8f0e4324247320f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Sun, 1 Mar 2026 21:58:44 +0530 Subject: [PATCH] Implement evidence-driven semantic gap detection --- .../intelligence/agents/specialized_agents.py | 195 ++++++++++++++++-- backend/services/intelligence/sif_agents.py | 194 ++++++++++++++++- .../services/intelligence/sif_integration.py | 74 +++---- backend/services/sif_onboarding_service.py | 26 ++- 4 files changed, 412 insertions(+), 77 deletions(-) diff --git a/backend/services/intelligence/agents/specialized_agents.py b/backend/services/intelligence/agents/specialized_agents.py index 1dc63aed..854aab29 100644 --- a/backend/services/intelligence/agents/specialized_agents.py +++ b/backend/services/intelligence/agents/specialized_agents.py @@ -7,6 +7,8 @@ Each agent leverages TxtaiIntelligenceService for semantic operations. import traceback import json import asyncio +import re +from collections import Counter from typing import List, Dict, Any, Optional from datetime import datetime from loguru import logger @@ -177,23 +179,193 @@ class StrategyArchitectAgent(SIFBaseAgent): self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices)) try: - # STUB: Implement cross-index comparison - # This would involve: - # 1. Getting user content topics/themes - # 2. Getting competitor content topics/themes - # 3. Finding topics competitors cover but user doesn't - - logger.info(f"[{self.__class__.__name__}] Found semantic gaps analysis stub") - return [ - {"topic": "Topic A", "priority": "high", "reason": "Competitor coverage gap"}, - {"topic": "Topic B", "priority": "medium", "reason": "Emerging trend"} - ] + documents = await self._fetch_index_documents() + if not documents: + logger.info(f"[{self.__class__.__name__}] No indexed documents available for gap detection") + return [] + + competitor_docs, user_docs = [], [] + allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None + for doc in documents: + metadata = doc.get("metadata", {}) + doc_type = str(metadata.get("type", "")).lower() + if "competitor" in doc_type: + if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids: + continue + competitor_docs.append(doc) + elif "user" in doc_type: + user_docs.append(doc) + + if not competitor_docs or not user_docs: + logger.info( + f"[{self.__class__.__name__}] Insufficient split for gap analysis: " + f"user_docs={len(user_docs)}, competitor_docs={len(competitor_docs)}" + ) + return [] + + competitor_topics = self._extract_topic_density(competitor_docs) + user_topics = self._extract_topic_density(user_docs) + + gaps = [] + for topic, competitor_density in competitor_topics.items(): + user_density = user_topics.get(topic, 0.0) + density_gap = competitor_density - user_density + if density_gap <= 0.08: + continue + + confidence = max( + 0.0, + min(1.0, 0.35 + (density_gap * 1.5) + (competitor_density * 0.4)) + ) + priority = "high" if confidence >= 0.75 else "medium" if confidence >= 0.5 else "low" + gaps.append({ + "topic": topic, + "priority": priority, + "reason": ( + f"Competitors mention '{topic}' substantially more often " + f"(density {competitor_density:.2f} vs {user_density:.2f})." + ), + "confidence": round(confidence, 3), + "topic_density": { + "competitor": round(competitor_density, 4), + "user": round(user_density, 4), + "gap": round(density_gap, 4) + }, + "evidence": { + "competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic), + "user_sample_titles": self._sample_titles_for_topic(user_docs, topic), + "competitor_doc_count": len(competitor_docs), + "user_doc_count": len(user_docs) + } + }) + + gaps.sort( + key=lambda item: ( + item.get("confidence", 0), + item.get("topic_density", {}).get("gap", 0) + ), + reverse=True + ) + return gaps[:12] except Exception as e: logger.error(f"[{self.__class__.__name__}] Failed to find semantic gaps: {e}") logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") return [] + async def _fetch_index_documents(self) -> List[Dict[str, Any]]: + """Fetch indexed documents and normalize metadata from txtai result objects.""" + if not self.intelligence.is_initialized() or not self.intelligence.embeddings: + return [] + + embeddings = self.intelligence.embeddings + limit = 0 + if hasattr(embeddings, "count"): + try: + limit = int(embeddings.count()) + except Exception: + limit = 0 + + documents = [] + candidate_queries = [] + if limit > 0: + candidate_queries.extend([ + f"select id, text, object from txtai limit {limit}", + f"select id, text, tags from txtai limit {limit}" + ]) + candidate_queries.extend(["marketing", "content", "seo", "strategy", "social media"]) + + seen_ids = set() + for query in candidate_queries: + try: + query_limit = limit if query.startswith("select") and limit > 0 else max(10, limit or 50) + rows = embeddings.search(query, limit=query_limit) + except Exception: + continue + + for row in rows or []: + doc_id = str(row.get("id", "")) + dedupe_key = doc_id or str(hash(f"{row.get('text','')}::{row.get('score',0)}")) + if dedupe_key in seen_ids: + continue + seen_ids.add(dedupe_key) + documents.append({ + "id": doc_id, + "text": row.get("text", "") or "", + "metadata": self._normalize_metadata(row) + }) + + if limit > 0 and len(documents) >= limit: + break + + return documents + + def _normalize_metadata(self, row: Dict[str, Any]) -> Dict[str, Any]: + """Normalize metadata payloads from txtai search rows.""" + for key in ("object", "tags", "metadata", "meta"): + payload = row.get(key) + if isinstance(payload, dict): + return payload + if isinstance(payload, str): + try: + parsed = json.loads(payload) + if isinstance(parsed, dict): + return parsed + except Exception: + continue + return {} + + def _extract_topic_density(self, documents: List[Dict[str, Any]]) -> Dict[str, float]: + """Extract topic density from document metadata and titles.""" + topic_counter: Counter = Counter() + + for doc in documents: + metadata = doc.get("metadata", {}) + candidates = [] + for key in ("topics", "topic", "keywords", "keyword", "tags", "category"): + value = metadata.get(key) + if isinstance(value, list): + candidates.extend([str(v) for v in value if v]) + elif isinstance(value, str) and value.strip(): + candidates.extend(re.split(r"[,|/]", value)) + + title = metadata.get("title") or doc.get("text", "")[:120] + if title: + title_tokens = re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower()) + candidates.extend(title_tokens) + + normalized = { + item.strip().lower() for item in candidates + if item and len(item.strip()) >= 4 and not item.strip().isdigit() + } + for topic in normalized: + topic_counter[topic] += 1 + + total_docs = max(1, len(documents)) + return { + topic: count / total_docs + for topic, count in topic_counter.items() + if count >= 2 + } + + def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]: + """Return sample titles for a topic.""" + samples = [] + topic_lower = topic.lower() + for doc in documents: + metadata = doc.get("metadata", {}) + title = metadata.get("title") or doc.get("text", "")[:100] + if not title: + continue + + haystack = f"{title} {json.dumps(metadata, default=str)}".lower() + if topic_lower in haystack: + samples.append(str(title)) + if len(samples) >= limit: + break + + return samples + class ContentGuardianAgent(SIFBaseAgent): """Agent for preventing cannibalization and ensuring content originality.""" @@ -2436,4 +2608,3 @@ class SocialAmplificationAgent(BaseALwrityAgent): "status": "scheduled", "timestamp": datetime.utcnow().isoformat() } - diff --git a/backend/services/intelligence/sif_agents.py b/backend/services/intelligence/sif_agents.py index ac15c3db..d7beaa82 100644 --- a/backend/services/intelligence/sif_agents.py +++ b/backend/services/intelligence/sif_agents.py @@ -7,6 +7,8 @@ Each agent leverages TxtaiIntelligenceService for semantic operations. import traceback import json import asyncio +import re +from collections import Counter from typing import List, Dict, Any, Optional from datetime import datetime from loguru import logger @@ -212,23 +214,193 @@ class StrategyArchitectAgent(SIFBaseAgent): self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices)) try: - # STUB: Implement cross-index comparison - # This would involve: - # 1. Getting user content topics/themes - # 2. Getting competitor content topics/themes - # 3. Finding topics competitors cover but user doesn't - - logger.info(f"[{self.__class__.__name__}] Found semantic gaps analysis stub") - return [ - {"topic": "Topic A", "priority": "high", "reason": "Competitor coverage gap"}, - {"topic": "Topic B", "priority": "medium", "reason": "Emerging trend"} - ] + documents = await self._fetch_index_documents() + if not documents: + logger.info(f"[{self.__class__.__name__}] No indexed documents available for gap detection") + return [] + + competitor_docs, user_docs = [], [] + allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None + for doc in documents: + metadata = doc.get("metadata", {}) + doc_type = str(metadata.get("type", "")).lower() + if "competitor" in doc_type: + if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids: + continue + competitor_docs.append(doc) + elif "user" in doc_type: + user_docs.append(doc) + + if not competitor_docs or not user_docs: + logger.info( + f"[{self.__class__.__name__}] Insufficient split for gap analysis: " + f"user_docs={len(user_docs)}, competitor_docs={len(competitor_docs)}" + ) + return [] + + competitor_topics = self._extract_topic_density(competitor_docs) + user_topics = self._extract_topic_density(user_docs) + + gaps = [] + for topic, competitor_density in competitor_topics.items(): + user_density = user_topics.get(topic, 0.0) + density_gap = competitor_density - user_density + if density_gap <= 0.08: + continue + + confidence = max( + 0.0, + min(1.0, 0.35 + (density_gap * 1.5) + (competitor_density * 0.4)) + ) + priority = "high" if confidence >= 0.75 else "medium" if confidence >= 0.5 else "low" + gaps.append({ + "topic": topic, + "priority": priority, + "reason": ( + f"Competitors mention '{topic}' substantially more often " + f"(density {competitor_density:.2f} vs {user_density:.2f})." + ), + "confidence": round(confidence, 3), + "topic_density": { + "competitor": round(competitor_density, 4), + "user": round(user_density, 4), + "gap": round(density_gap, 4) + }, + "evidence": { + "competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic), + "user_sample_titles": self._sample_titles_for_topic(user_docs, topic), + "competitor_doc_count": len(competitor_docs), + "user_doc_count": len(user_docs) + } + }) + + gaps.sort( + key=lambda item: ( + item.get("confidence", 0), + item.get("topic_density", {}).get("gap", 0) + ), + reverse=True + ) + return gaps[:12] except Exception as e: logger.error(f"[{self.__class__.__name__}] Failed to find semantic gaps: {e}") logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") return [] + async def _fetch_index_documents(self) -> List[Dict[str, Any]]: + """Fetch indexed documents and normalize metadata from txtai result objects.""" + if not self.intelligence.is_initialized() or not self.intelligence.embeddings: + return [] + + embeddings = self.intelligence.embeddings + limit = 0 + if hasattr(embeddings, "count"): + try: + limit = int(embeddings.count()) + except Exception: + limit = 0 + + documents = [] + candidate_queries = [] + if limit > 0: + candidate_queries.extend([ + f"select id, text, object from txtai limit {limit}", + f"select id, text, tags from txtai limit {limit}" + ]) + candidate_queries.extend(["marketing", "content", "seo", "strategy", "social media"]) + + seen_ids = set() + for query in candidate_queries: + try: + query_limit = limit if query.startswith("select") and limit > 0 else max(10, limit or 50) + rows = embeddings.search(query, limit=query_limit) + except Exception: + continue + + for row in rows or []: + doc_id = str(row.get("id", "")) + dedupe_key = doc_id or str(hash(f"{row.get('text','')}::{row.get('score',0)}")) + if dedupe_key in seen_ids: + continue + seen_ids.add(dedupe_key) + documents.append({ + "id": doc_id, + "text": row.get("text", "") or "", + "metadata": self._normalize_metadata(row) + }) + + if limit > 0 and len(documents) >= limit: + break + + return documents + + def _normalize_metadata(self, row: Dict[str, Any]) -> Dict[str, Any]: + """Normalize metadata payloads from txtai search rows.""" + for key in ("object", "tags", "metadata", "meta"): + payload = row.get(key) + if isinstance(payload, dict): + return payload + if isinstance(payload, str): + try: + parsed = json.loads(payload) + if isinstance(parsed, dict): + return parsed + except Exception: + continue + return {} + + def _extract_topic_density(self, documents: List[Dict[str, Any]]) -> Dict[str, float]: + """Extract topic density from document metadata and titles.""" + topic_counter: Counter = Counter() + + for doc in documents: + metadata = doc.get("metadata", {}) + candidates = [] + for key in ("topics", "topic", "keywords", "keyword", "tags", "category"): + value = metadata.get(key) + if isinstance(value, list): + candidates.extend([str(v) for v in value if v]) + elif isinstance(value, str) and value.strip(): + candidates.extend(re.split(r"[,|/]", value)) + + title = metadata.get("title") or doc.get("text", "")[:120] + if title: + title_tokens = re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower()) + candidates.extend(title_tokens) + + normalized = { + item.strip().lower() for item in candidates + if item and len(item.strip()) >= 4 and not item.strip().isdigit() + } + for topic in normalized: + topic_counter[topic] += 1 + + total_docs = max(1, len(documents)) + return { + topic: count / total_docs + for topic, count in topic_counter.items() + if count >= 2 + } + + def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]: + """Return sample titles for a topic.""" + samples = [] + topic_lower = topic.lower() + for doc in documents: + metadata = doc.get("metadata", {}) + title = metadata.get("title") or doc.get("text", "")[:100] + if not title: + continue + + haystack = f"{title} {json.dumps(metadata, default=str)}".lower() + if topic_lower in haystack: + samples.append(str(title)) + if len(samples) >= limit: + break + + return samples + class ContentGuardianAgent(SIFBaseAgent): """Agent for preventing cannibalization and ensuring content originality.""" diff --git a/backend/services/intelligence/sif_integration.py b/backend/services/intelligence/sif_integration.py index 9cf96eb2..bbbb12f5 100644 --- a/backend/services/intelligence/sif_integration.py +++ b/backend/services/intelligence/sif_integration.py @@ -974,60 +974,36 @@ class SIFIntegrationService: return pillars async def _identify_semantic_gaps(self, website_data: Dict[str, Any]) -> List[Dict[str, Any]]: - """ - Identify semantic gaps in user content by comparing against competitor topics or industry standards. - Uses txtai semantic search to check coverage of key topics. - """ - gaps = [] + """Identify semantic gaps using StrategyArchitectAgent evidence-driven analysis.""" try: - # 1. Determine target topics to check - # In a real scenario, these come from competitor analysis or keyword research. - # Here we extract potential topics from competitor data or use defaults. - competitors = website_data.get('competitors', []) - target_topics = [] - - # Placeholder: Extract topics from competitor names/descriptions if available - # For now, we'll use a mix of generic marketing topics and any provided tags - target_topics = [ - "content strategy", "SEO optimization", "social media marketing", - "email campaigns", "brand storytelling", "customer retention", - "voice search", "video marketing", "influencer partnerships" - ] - - # Add specific topics from input if available - if 'target_keywords' in website_data: - target_topics.extend(website_data['target_keywords']) + if not self.strategy_agent: + from .sif_agents import StrategyArchitectAgent + self.strategy_agent = StrategyArchitectAgent(self.intelligence_service, user_id=self.user_id) - # 2. Check coverage for each topic in the user's index - for topic in target_topics: - # Search the user's index - results = await self.intelligence_service.search(topic, limit=1) - - # Check relevance - max_score = results[0]['score'] if results else 0.0 - - # If relevance is low, it's a gap - GAP_THRESHOLD = 0.45 - if max_score < GAP_THRESHOLD: - gaps.append({ - "topic": topic, - "current_coverage_score": float(max_score), - "gap_severity": "high" if max_score < 0.2 else "medium", - "reason": "Low semantic relevance in current content index", - "suggested_action": f"Create dedicated content for '{topic}'" - }) - - # Sort by severity (lower score = higher severity) - gaps.sort(key=lambda x: x['current_coverage_score']) - - return gaps[:5] # Return top 5 gaps + competitor_ids = website_data.get("competitor_indices", []) or [] + gaps = await self.strategy_agent.find_semantic_gaps(competitor_indices=competitor_ids) + + normalized_gaps = [] + for gap in gaps: + density = gap.get("topic_density", {}) + normalized_gaps.append({ + "topic": gap.get("topic"), + "priority": gap.get("priority", "medium"), + "reason": gap.get("reason", "Competitor coverage gap"), + "confidence": gap.get("confidence", 0.0), + "current_coverage_score": density.get("user", 0.0), + "competitor_coverage_score": density.get("competitor", 0.0), + "gap_severity": gap.get("priority", "medium"), + "suggested_action": f"Create dedicated content for '{gap.get('topic', 'this topic')}'", + "topic_density": density, + "evidence": gap.get("evidence", {}) + }) + + return normalized_gaps except Exception as e: logger.error(f"Error identifying semantic gaps: {e}") - # Fallback to sample data if index search fails completely - return [ - {"topic": "error_fallback", "reason": str(e), "current_coverage_score": 0.0} - ] + return [] async def _analyze_competitor_semantics(self, website_data: Dict[str, Any]) -> Dict[str, Any]: """Analyze competitor semantic positioning.""" diff --git a/backend/services/sif_onboarding_service.py b/backend/services/sif_onboarding_service.py index 9bf9fb82..7c151ee9 100644 --- a/backend/services/sif_onboarding_service.py +++ b/backend/services/sif_onboarding_service.py @@ -52,9 +52,9 @@ class SIFOnboardingIntegration: self.harvester = SemanticHarvesterService() # Initialize agents - self.strategy_agent = StrategyArchitectAgent(self.intelligence) - self.guardian_agent = ContentGuardianAgent(self.intelligence) - self.link_agent = LinkGraphAgent(self.intelligence) + self.strategy_agent = StrategyArchitectAgent(self.intelligence, user_id) + self.guardian_agent = ContentGuardianAgent(self.intelligence, user_id) + self.link_agent = LinkGraphAgent(self.intelligence, user_id) logger.info(f"[SIFOnboarding] Initialized for user {user_id}") @@ -254,7 +254,23 @@ class SIFOnboardingIntegration: "priority": "high", "title": "Fill Content Gaps", "description": f"Competitors are covering {len(semantic_gaps)} topics you haven't addressed.", - "action_items": [f"Create content about: {gap.get('topic', 'Unknown topic')}" for gap in semantic_gaps[:5]] + "action_items": [ + ( + f"Create content about: {gap.get('topic', 'Unknown topic')} " + f"({gap.get('priority', 'medium')} priority) - {gap.get('reason', 'Coverage gap identified')}" + ) + for gap in semantic_gaps[:5] + ], + "evidence": [ + { + "topic": gap.get("topic"), + "priority": gap.get("priority"), + "confidence": gap.get("confidence"), + "topic_density": gap.get("topic_density"), + "competitor_sample_titles": gap.get("evidence", {}).get("competitor_sample_titles", []) + } + for gap in semantic_gaps[:5] + ] }) # Theme-based recommendations @@ -448,4 +464,4 @@ async def discover_competitors(request: CompetitorDiscoveryRequest, user=Depends "content_analysis": enhanced_results["content_analysis"], "strategic_recommendations": enhanced_results["semantic_insights"]["strategic_recommendations"] } -""" \ No newline at end of file +"""