From cd9ffb5ef59d2ed03b286bf852ebac1392fd6edb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Mon, 2 Mar 2026 11:42:52 +0530 Subject: [PATCH] Implement evidence-based semantic gap detection for strategy agents --- .../intelligence/agents/specialized_agents.py | 120 ++++++++++++------ backend/services/intelligence/sif_agents.py | 113 ++++++++++++----- backend/services/sif_onboarding_service.py | 100 +++++++++------ 3 files changed, 226 insertions(+), 107 deletions(-) diff --git a/backend/services/intelligence/agents/specialized_agents.py b/backend/services/intelligence/agents/specialized_agents.py index 677d91f9..333799b0 100644 --- a/backend/services/intelligence/agents/specialized_agents.py +++ b/backend/services/intelligence/agents/specialized_agents.py @@ -174,7 +174,7 @@ class StrategyArchitectAgent(SIFBaseAgent): return proposals - async def find_semantic_gaps(self, competitor_indices: List[int]) -> List[Dict[str, Any]]: + async def find_semantic_gaps(self, competitor_indices: List[Any]) -> List[Dict[str, Any]]: """Compare user content vs competitor content to find missing topics.""" self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices)) @@ -186,14 +186,19 @@ class StrategyArchitectAgent(SIFBaseAgent): competitor_docs, user_docs = [], [] allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None + if allowed_competitor_ids: + for idx in competitor_indices: + if isinstance(idx, int) and 0 <= idx < len(documents): + allowed_competitor_ids.add(str(documents[idx].get("id", ""))) + for doc in documents: metadata = doc.get("metadata", {}) - doc_type = str(metadata.get("type", "")).lower() - if "competitor" in doc_type: + role = self._infer_document_role(metadata) + if role == "competitor": if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids: continue competitor_docs.append(doc) - elif "user" in doc_type: + elif role == "user": user_docs.append(doc) if not competitor_docs or not user_docs: @@ -203,28 +208,23 @@ class StrategyArchitectAgent(SIFBaseAgent): ) return [] - # FIX: Ensure we correctly map indices to documents if indices were passed as integers - # The filter allowed_competitor_ids uses str(idx) but if competitor_indices contained - # positional indices instead of IDs, we might have filtered everything out. - # In this implementation, we assume competitor_indices are doc IDs. - # If they are positional, we need a way to map them. - # For now, we trust the caller passed IDs. - competitor_topics = self._extract_topic_density(competitor_docs) user_topics = self._extract_topic_density(user_docs) + competitor_topic_docs = self._map_topic_to_doc_titles(competitor_docs) + user_topic_docs = self._map_topic_to_doc_titles(user_docs) gaps = [] for topic, competitor_density in competitor_topics.items(): user_density = user_topics.get(topic, 0.0) - density_gap = competitor_density - user_density - if density_gap <= 0.08: + coverage_delta = competitor_density - user_density + if coverage_delta <= 0.08: continue - confidence = max( - 0.0, - min(1.0, 0.35 + (density_gap * 1.5) + (competitor_density * 0.4)) - ) - priority = "high" if confidence >= 0.75 else "medium" if confidence >= 0.5 else "low" + competitor_support = len(competitor_topic_docs.get(topic, [])) + user_support = len(user_topic_docs.get(topic, [])) + confidence = max(0.0, min(1.0, (coverage_delta * 0.65) + (min(1.0, competitor_support / 4) * 0.35))) + severity_score = max(0.0, min(1.0, (coverage_delta * 0.7) + (confidence * 0.3))) + priority = "high" if severity_score >= 0.72 else "medium" if severity_score >= 0.45 else "low" gaps.append({ "topic": topic, "priority": priority, @@ -233,14 +233,18 @@ class StrategyArchitectAgent(SIFBaseAgent): f"(density {competitor_density:.2f} vs {user_density:.2f})." ), "confidence": round(confidence, 3), + "severity_score": round(severity_score, 3), + "coverage_delta": round(coverage_delta, 4), "topic_density": { "competitor": round(competitor_density, 4), "user": round(user_density, 4), - "gap": round(density_gap, 4) + "gap": round(coverage_delta, 4) }, "evidence": { "competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic), "user_sample_titles": self._sample_titles_for_topic(user_docs, topic), + "competitor_supporting_docs": competitor_support, + "user_supporting_docs": user_support, "competitor_doc_count": len(competitor_docs), "user_doc_count": len(user_docs) } @@ -248,6 +252,7 @@ class StrategyArchitectAgent(SIFBaseAgent): gaps.sort( key=lambda item: ( + item.get("severity_score", 0), item.get("confidence", 0), item.get("topic_density", {}).get("gap", 0) ), @@ -327,25 +332,7 @@ class StrategyArchitectAgent(SIFBaseAgent): topic_counter: Counter = Counter() for doc in documents: - metadata = doc.get("metadata", {}) - candidates = [] - for key in ("topics", "topic", "keywords", "keyword", "tags", "category"): - value = metadata.get(key) - if isinstance(value, list): - candidates.extend([str(v) for v in value if v]) - elif isinstance(value, str) and value.strip(): - candidates.extend(re.split(r"[,|/]", value)) - - title = metadata.get("title") or doc.get("text", "")[:120] - if title: - title_tokens = re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower()) - candidates.extend(title_tokens) - - normalized = { - item.strip().lower() for item in candidates - if item and len(item.strip()) >= 4 and not item.strip().isdigit() - } - for topic in normalized: + for topic in self._extract_topics_from_document(doc): topic_counter[topic] += 1 total_docs = max(1, len(documents)) @@ -355,6 +342,63 @@ class StrategyArchitectAgent(SIFBaseAgent): if count >= 2 } + def _infer_document_role(self, metadata: Dict[str, Any]) -> str: + """Infer whether a document belongs to user content or competitor content.""" + signals = [ + metadata.get("type", ""), + metadata.get("doc_type", ""), + metadata.get("content_type", ""), + metadata.get("source", ""), + metadata.get("origin", "") + ] + signal_blob = " ".join(str(item).lower() for item in signals if item) + + if any(token in signal_blob for token in ("competitor", "rival", "market_peer")): + return "competitor" + if any(token in signal_blob for token in ("user", "owned", "first_party", "customer_site")): + return "user" + return "unknown" + + def _extract_topics_from_document(self, doc: Dict[str, Any]) -> List[str]: + """Extract normalized topic labels from metadata and lightweight text fields.""" + metadata = doc.get("metadata", {}) + candidates: List[str] = [] + + for key in ("topics", "topic", "themes", "theme", "keywords", "keyword", "tags", "category", "categories"): + value = metadata.get(key) + if isinstance(value, list): + candidates.extend([str(v) for v in value if v]) + elif isinstance(value, str) and value.strip(): + candidates.extend(re.split(r"[,|/]", value)) + + title = metadata.get("title") or doc.get("text", "")[:160] + if title: + candidates.extend(re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower())) + + stopwords = { + "with", "from", "that", "this", "your", "about", "into", "using", "guide", "best", + "tips", "what", "when", "where", "how", "the", "and", "for", "2024", "2025" + } + normalized = { + item.strip().lower() + for item in candidates + if item + and len(item.strip()) >= 4 + and not item.strip().isdigit() + and item.strip().lower() not in stopwords + } + return sorted(normalized) + + def _map_topic_to_doc_titles(self, documents: List[Dict[str, Any]]) -> Dict[str, List[str]]: + """Map each topic to a list of document titles that support it.""" + mapping: Dict[str, List[str]] = {} + for doc in documents: + metadata = doc.get("metadata", {}) + title = str(metadata.get("title") or doc.get("text", "")[:100] or "Untitled") + for topic in self._extract_topics_from_document(doc): + mapping.setdefault(topic, []).append(title) + return mapping + def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]: """Return sample titles for a topic.""" samples = [] diff --git a/backend/services/intelligence/sif_agents.py b/backend/services/intelligence/sif_agents.py index d7beaa82..6bc697c0 100644 --- a/backend/services/intelligence/sif_agents.py +++ b/backend/services/intelligence/sif_agents.py @@ -209,7 +209,7 @@ class StrategyArchitectAgent(SIFBaseAgent): # Simple confidence based on cluster size - larger clusters are more reliable return min(1.0, len(cluster_indices) / 10.0) - async def find_semantic_gaps(self, competitor_indices: List[int]) -> List[Dict[str, Any]]: + async def find_semantic_gaps(self, competitor_indices: List[Any]) -> List[Dict[str, Any]]: """Compare user content vs competitor content to find missing topics.""" self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices)) @@ -221,14 +221,19 @@ class StrategyArchitectAgent(SIFBaseAgent): competitor_docs, user_docs = [], [] allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None + if allowed_competitor_ids: + for idx in competitor_indices: + if isinstance(idx, int) and 0 <= idx < len(documents): + allowed_competitor_ids.add(str(documents[idx].get("id", ""))) + for doc in documents: metadata = doc.get("metadata", {}) - doc_type = str(metadata.get("type", "")).lower() - if "competitor" in doc_type: + role = self._infer_document_role(metadata) + if role == "competitor": if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids: continue competitor_docs.append(doc) - elif "user" in doc_type: + elif role == "user": user_docs.append(doc) if not competitor_docs or not user_docs: @@ -240,19 +245,21 @@ class StrategyArchitectAgent(SIFBaseAgent): competitor_topics = self._extract_topic_density(competitor_docs) user_topics = self._extract_topic_density(user_docs) + competitor_topic_docs = self._map_topic_to_doc_titles(competitor_docs) + user_topic_docs = self._map_topic_to_doc_titles(user_docs) gaps = [] for topic, competitor_density in competitor_topics.items(): user_density = user_topics.get(topic, 0.0) - density_gap = competitor_density - user_density - if density_gap <= 0.08: + coverage_delta = competitor_density - user_density + if coverage_delta <= 0.08: continue - confidence = max( - 0.0, - min(1.0, 0.35 + (density_gap * 1.5) + (competitor_density * 0.4)) - ) - priority = "high" if confidence >= 0.75 else "medium" if confidence >= 0.5 else "low" + competitor_support = len(competitor_topic_docs.get(topic, [])) + user_support = len(user_topic_docs.get(topic, [])) + confidence = max(0.0, min(1.0, (coverage_delta * 0.65) + (min(1.0, competitor_support / 4) * 0.35))) + severity_score = max(0.0, min(1.0, (coverage_delta * 0.7) + (confidence * 0.3))) + priority = "high" if severity_score >= 0.72 else "medium" if severity_score >= 0.45 else "low" gaps.append({ "topic": topic, "priority": priority, @@ -261,14 +268,18 @@ class StrategyArchitectAgent(SIFBaseAgent): f"(density {competitor_density:.2f} vs {user_density:.2f})." ), "confidence": round(confidence, 3), + "severity_score": round(severity_score, 3), + "coverage_delta": round(coverage_delta, 4), "topic_density": { "competitor": round(competitor_density, 4), "user": round(user_density, 4), - "gap": round(density_gap, 4) + "gap": round(coverage_delta, 4) }, "evidence": { "competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic), "user_sample_titles": self._sample_titles_for_topic(user_docs, topic), + "competitor_supporting_docs": competitor_support, + "user_supporting_docs": user_support, "competitor_doc_count": len(competitor_docs), "user_doc_count": len(user_docs) } @@ -276,6 +287,7 @@ class StrategyArchitectAgent(SIFBaseAgent): gaps.sort( key=lambda item: ( + item.get("severity_score", 0), item.get("confidence", 0), item.get("topic_density", {}).get("gap", 0) ), @@ -355,25 +367,7 @@ class StrategyArchitectAgent(SIFBaseAgent): topic_counter: Counter = Counter() for doc in documents: - metadata = doc.get("metadata", {}) - candidates = [] - for key in ("topics", "topic", "keywords", "keyword", "tags", "category"): - value = metadata.get(key) - if isinstance(value, list): - candidates.extend([str(v) for v in value if v]) - elif isinstance(value, str) and value.strip(): - candidates.extend(re.split(r"[,|/]", value)) - - title = metadata.get("title") or doc.get("text", "")[:120] - if title: - title_tokens = re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower()) - candidates.extend(title_tokens) - - normalized = { - item.strip().lower() for item in candidates - if item and len(item.strip()) >= 4 and not item.strip().isdigit() - } - for topic in normalized: + for topic in self._extract_topics_from_document(doc): topic_counter[topic] += 1 total_docs = max(1, len(documents)) @@ -383,6 +377,63 @@ class StrategyArchitectAgent(SIFBaseAgent): if count >= 2 } + def _infer_document_role(self, metadata: Dict[str, Any]) -> str: + """Infer whether a document belongs to user content or competitor content.""" + signals = [ + metadata.get("type", ""), + metadata.get("doc_type", ""), + metadata.get("content_type", ""), + metadata.get("source", ""), + metadata.get("origin", "") + ] + signal_blob = " ".join(str(item).lower() for item in signals if item) + + if any(token in signal_blob for token in ("competitor", "rival", "market_peer")): + return "competitor" + if any(token in signal_blob for token in ("user", "owned", "first_party", "customer_site")): + return "user" + return "unknown" + + def _extract_topics_from_document(self, doc: Dict[str, Any]) -> List[str]: + """Extract normalized topic labels from metadata and lightweight text fields.""" + metadata = doc.get("metadata", {}) + candidates: List[str] = [] + + for key in ("topics", "topic", "themes", "theme", "keywords", "keyword", "tags", "category", "categories"): + value = metadata.get(key) + if isinstance(value, list): + candidates.extend([str(v) for v in value if v]) + elif isinstance(value, str) and value.strip(): + candidates.extend(re.split(r"[,|/]", value)) + + title = metadata.get("title") or doc.get("text", "")[:160] + if title: + candidates.extend(re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower())) + + stopwords = { + "with", "from", "that", "this", "your", "about", "into", "using", "guide", "best", + "tips", "what", "when", "where", "how", "the", "and", "for", "2024", "2025" + } + normalized = { + item.strip().lower() + for item in candidates + if item + and len(item.strip()) >= 4 + and not item.strip().isdigit() + and item.strip().lower() not in stopwords + } + return sorted(normalized) + + def _map_topic_to_doc_titles(self, documents: List[Dict[str, Any]]) -> Dict[str, List[str]]: + """Map each topic to a list of document titles that support it.""" + mapping: Dict[str, List[str]] = {} + for doc in documents: + metadata = doc.get("metadata", {}) + title = str(metadata.get("title") or doc.get("text", "")[:100] or "Untitled") + for topic in self._extract_topics_from_document(doc): + mapping.setdefault(topic, []).append(title) + return mapping + def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]: """Return sample titles for a topic.""" samples = [] diff --git a/backend/services/sif_onboarding_service.py b/backend/services/sif_onboarding_service.py index 7c151ee9..6eab1841 100644 --- a/backend/services/sif_onboarding_service.py +++ b/backend/services/sif_onboarding_service.py @@ -153,10 +153,16 @@ class SIFOnboardingIntegration: content_pillars = await self.strategy_agent.discover_pillars() # Find semantic gaps (what competitors cover that user doesn't) - semantic_gaps = await self.strategy_agent.find_semantic_gaps(competitor_indices=[]) - + indexed_documents = await self.strategy_agent._fetch_index_documents() + competitor_doc_ids = [ + str(doc.get("id", "")) + for doc in indexed_documents + if self.strategy_agent._infer_document_role(doc.get("metadata", {})) == "competitor" + ] + semantic_gaps = await self.strategy_agent.find_semantic_gaps(competitor_indices=competitor_doc_ids) + # Analyze content themes and topics - themes_analysis = await self._analyze_content_themes(user_content, competitor_content) + themes_analysis = await self._analyze_content_themes(indexed_documents) # Generate strategic recommendations recommendations = await self._generate_strategic_recommendations( @@ -185,47 +191,65 @@ class SIFOnboardingIntegration: "error": str(e) } - async def _analyze_content_themes(self, user_content: List[Dict], competitor_content: List[Dict]) -> Optional[Dict[str, Any]]: - """Analyze content themes and topics using semantic search.""" + async def _analyze_content_themes(self, indexed_documents: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + """Analyze themes from indexed metadata instead of static literals.""" logger.info("[SIFOnboarding] Analyzing content themes") - + try: - # Combine all content for theme analysis - all_content = user_content + competitor_content - - if not all_content: + if not indexed_documents: return None - - # Extract key themes using semantic search - themes = [] - theme_queries = [ - "digital marketing strategies", - "content marketing best practices", - "SEO optimization techniques", - "social media marketing", - "email marketing campaigns", - "brand positioning and messaging" + + user_docs = [ + doc for doc in indexed_documents + if self.strategy_agent._infer_document_role(doc.get("metadata", {})) == "user" ] - - for query in theme_queries: - results = await self.intelligence.search(query, limit=3) - if results: - themes.append({ - "theme": query, - "relevance_score": results[0].get("score", 0) if results else 0, - "top_result": results[0] if results else None - }) - - # Sort themes by relevance - themes.sort(key=lambda x: x["relevance_score"], reverse=True) - + competitor_docs = [ + doc for doc in indexed_documents + if self.strategy_agent._infer_document_role(doc.get("metadata", {})) == "competitor" + ] + if not user_docs and not competitor_docs: + return None + + user_theme_density = self.strategy_agent._extract_topic_density(user_docs) + competitor_theme_density = self.strategy_agent._extract_topic_density(competitor_docs) + all_topics = set(user_theme_density) | set(competitor_theme_density) + + ranked_themes = [] + for topic in all_topics: + user_score = user_theme_density.get(topic, 0.0) + competitor_score = competitor_theme_density.get(topic, 0.0) + ranked_themes.append({ + "theme": topic, + "user_density": round(user_score, 4), + "competitor_density": round(competitor_score, 4), + "combined_relevance": round((user_score + competitor_score) / 2, 4), + "coverage_delta": round(competitor_score - user_score, 4), + "classification": ( + "competitor_led" + if competitor_score > user_score + 0.05 + else "user_led" + if user_score > competitor_score + 0.05 + else "shared" + ), + "evidence": { + "user_sample_titles": self.strategy_agent._sample_titles_for_topic(user_docs, topic), + "competitor_sample_titles": self.strategy_agent._sample_titles_for_topic(competitor_docs, topic) + } + }) + + ranked_themes.sort( + key=lambda item: (item["combined_relevance"], abs(item["coverage_delta"])), + reverse=True + ) + return { - "top_themes": themes[:5], - "total_themes_analyzed": len(themes), - "user_content_themes": [t for t in themes if any(t["theme"] in page.get("content", "") for page in user_content)], - "competitor_content_themes": [t for t in themes if any(t["theme"] in page.get("content", "") for page in competitor_content)] + "top_themes": ranked_themes[:8], + "total_themes_analyzed": len(ranked_themes), + "user_theme_count": len(user_theme_density), + "competitor_theme_count": len(competitor_theme_density), + "theme_source": "indexed_metadata" } - + except Exception as e: logger.error(f"[SIFOnboarding] Theme analysis failed: {e}") return None