Merge branch 'review/pr-363'

This commit is contained in:
ajaysi
2026-03-02 11:46:02 +05:30
3 changed files with 226 additions and 107 deletions

View File

@@ -174,7 +174,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
return proposals
async def find_semantic_gaps(self, competitor_indices: List[int]) -> List[Dict[str, Any]]:
async def find_semantic_gaps(self, competitor_indices: List[Any]) -> List[Dict[str, Any]]:
"""Compare user content vs competitor content to find missing topics."""
self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices))
@@ -186,14 +186,19 @@ class StrategyArchitectAgent(SIFBaseAgent):
competitor_docs, user_docs = [], []
allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None
if allowed_competitor_ids:
for idx in competitor_indices:
if isinstance(idx, int) and 0 <= idx < len(documents):
allowed_competitor_ids.add(str(documents[idx].get("id", "")))
for doc in documents:
metadata = doc.get("metadata", {})
doc_type = str(metadata.get("type", "")).lower()
if "competitor" in doc_type:
role = self._infer_document_role(metadata)
if role == "competitor":
if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids:
continue
competitor_docs.append(doc)
elif "user" in doc_type:
elif role == "user":
user_docs.append(doc)
if not competitor_docs or not user_docs:
@@ -203,28 +208,23 @@ class StrategyArchitectAgent(SIFBaseAgent):
)
return []
# FIX: Ensure we correctly map indices to documents if indices were passed as integers
# The filter allowed_competitor_ids uses str(idx) but if competitor_indices contained
# positional indices instead of IDs, we might have filtered everything out.
# In this implementation, we assume competitor_indices are doc IDs.
# If they are positional, we need a way to map them.
# For now, we trust the caller passed IDs.
competitor_topics = self._extract_topic_density(competitor_docs)
user_topics = self._extract_topic_density(user_docs)
competitor_topic_docs = self._map_topic_to_doc_titles(competitor_docs)
user_topic_docs = self._map_topic_to_doc_titles(user_docs)
gaps = []
for topic, competitor_density in competitor_topics.items():
user_density = user_topics.get(topic, 0.0)
density_gap = competitor_density - user_density
if density_gap <= 0.08:
coverage_delta = competitor_density - user_density
if coverage_delta <= 0.08:
continue
confidence = max(
0.0,
min(1.0, 0.35 + (density_gap * 1.5) + (competitor_density * 0.4))
)
priority = "high" if confidence >= 0.75 else "medium" if confidence >= 0.5 else "low"
competitor_support = len(competitor_topic_docs.get(topic, []))
user_support = len(user_topic_docs.get(topic, []))
confidence = max(0.0, min(1.0, (coverage_delta * 0.65) + (min(1.0, competitor_support / 4) * 0.35)))
severity_score = max(0.0, min(1.0, (coverage_delta * 0.7) + (confidence * 0.3)))
priority = "high" if severity_score >= 0.72 else "medium" if severity_score >= 0.45 else "low"
gaps.append({
"topic": topic,
"priority": priority,
@@ -233,14 +233,18 @@ class StrategyArchitectAgent(SIFBaseAgent):
f"(density {competitor_density:.2f} vs {user_density:.2f})."
),
"confidence": round(confidence, 3),
"severity_score": round(severity_score, 3),
"coverage_delta": round(coverage_delta, 4),
"topic_density": {
"competitor": round(competitor_density, 4),
"user": round(user_density, 4),
"gap": round(density_gap, 4)
"gap": round(coverage_delta, 4)
},
"evidence": {
"competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic),
"user_sample_titles": self._sample_titles_for_topic(user_docs, topic),
"competitor_supporting_docs": competitor_support,
"user_supporting_docs": user_support,
"competitor_doc_count": len(competitor_docs),
"user_doc_count": len(user_docs)
}
@@ -248,6 +252,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
gaps.sort(
key=lambda item: (
item.get("severity_score", 0),
item.get("confidence", 0),
item.get("topic_density", {}).get("gap", 0)
),
@@ -327,25 +332,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
topic_counter: Counter = Counter()
for doc in documents:
metadata = doc.get("metadata", {})
candidates = []
for key in ("topics", "topic", "keywords", "keyword", "tags", "category"):
value = metadata.get(key)
if isinstance(value, list):
candidates.extend([str(v) for v in value if v])
elif isinstance(value, str) and value.strip():
candidates.extend(re.split(r"[,|/]", value))
title = metadata.get("title") or doc.get("text", "")[:120]
if title:
title_tokens = re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower())
candidates.extend(title_tokens)
normalized = {
item.strip().lower() for item in candidates
if item and len(item.strip()) >= 4 and not item.strip().isdigit()
}
for topic in normalized:
for topic in self._extract_topics_from_document(doc):
topic_counter[topic] += 1
total_docs = max(1, len(documents))
@@ -355,6 +342,63 @@ class StrategyArchitectAgent(SIFBaseAgent):
if count >= 2
}
def _infer_document_role(self, metadata: Dict[str, Any]) -> str:
"""Infer whether a document belongs to user content or competitor content."""
signals = [
metadata.get("type", ""),
metadata.get("doc_type", ""),
metadata.get("content_type", ""),
metadata.get("source", ""),
metadata.get("origin", "")
]
signal_blob = " ".join(str(item).lower() for item in signals if item)
if any(token in signal_blob for token in ("competitor", "rival", "market_peer")):
return "competitor"
if any(token in signal_blob for token in ("user", "owned", "first_party", "customer_site")):
return "user"
return "unknown"
def _extract_topics_from_document(self, doc: Dict[str, Any]) -> List[str]:
"""Extract normalized topic labels from metadata and lightweight text fields."""
metadata = doc.get("metadata", {})
candidates: List[str] = []
for key in ("topics", "topic", "themes", "theme", "keywords", "keyword", "tags", "category", "categories"):
value = metadata.get(key)
if isinstance(value, list):
candidates.extend([str(v) for v in value if v])
elif isinstance(value, str) and value.strip():
candidates.extend(re.split(r"[,|/]", value))
title = metadata.get("title") or doc.get("text", "")[:160]
if title:
candidates.extend(re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower()))
stopwords = {
"with", "from", "that", "this", "your", "about", "into", "using", "guide", "best",
"tips", "what", "when", "where", "how", "the", "and", "for", "2024", "2025"
}
normalized = {
item.strip().lower()
for item in candidates
if item
and len(item.strip()) >= 4
and not item.strip().isdigit()
and item.strip().lower() not in stopwords
}
return sorted(normalized)
def _map_topic_to_doc_titles(self, documents: List[Dict[str, Any]]) -> Dict[str, List[str]]:
"""Map each topic to a list of document titles that support it."""
mapping: Dict[str, List[str]] = {}
for doc in documents:
metadata = doc.get("metadata", {})
title = str(metadata.get("title") or doc.get("text", "")[:100] or "Untitled")
for topic in self._extract_topics_from_document(doc):
mapping.setdefault(topic, []).append(title)
return mapping
def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]:
"""Return sample titles for a topic."""
samples = []

View File

@@ -209,7 +209,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
# Simple confidence based on cluster size - larger clusters are more reliable
return min(1.0, len(cluster_indices) / 10.0)
async def find_semantic_gaps(self, competitor_indices: List[int]) -> List[Dict[str, Any]]:
async def find_semantic_gaps(self, competitor_indices: List[Any]) -> List[Dict[str, Any]]:
"""Compare user content vs competitor content to find missing topics."""
self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices))
@@ -221,14 +221,19 @@ class StrategyArchitectAgent(SIFBaseAgent):
competitor_docs, user_docs = [], []
allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None
if allowed_competitor_ids:
for idx in competitor_indices:
if isinstance(idx, int) and 0 <= idx < len(documents):
allowed_competitor_ids.add(str(documents[idx].get("id", "")))
for doc in documents:
metadata = doc.get("metadata", {})
doc_type = str(metadata.get("type", "")).lower()
if "competitor" in doc_type:
role = self._infer_document_role(metadata)
if role == "competitor":
if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids:
continue
competitor_docs.append(doc)
elif "user" in doc_type:
elif role == "user":
user_docs.append(doc)
if not competitor_docs or not user_docs:
@@ -240,19 +245,21 @@ class StrategyArchitectAgent(SIFBaseAgent):
competitor_topics = self._extract_topic_density(competitor_docs)
user_topics = self._extract_topic_density(user_docs)
competitor_topic_docs = self._map_topic_to_doc_titles(competitor_docs)
user_topic_docs = self._map_topic_to_doc_titles(user_docs)
gaps = []
for topic, competitor_density in competitor_topics.items():
user_density = user_topics.get(topic, 0.0)
density_gap = competitor_density - user_density
if density_gap <= 0.08:
coverage_delta = competitor_density - user_density
if coverage_delta <= 0.08:
continue
confidence = max(
0.0,
min(1.0, 0.35 + (density_gap * 1.5) + (competitor_density * 0.4))
)
priority = "high" if confidence >= 0.75 else "medium" if confidence >= 0.5 else "low"
competitor_support = len(competitor_topic_docs.get(topic, []))
user_support = len(user_topic_docs.get(topic, []))
confidence = max(0.0, min(1.0, (coverage_delta * 0.65) + (min(1.0, competitor_support / 4) * 0.35)))
severity_score = max(0.0, min(1.0, (coverage_delta * 0.7) + (confidence * 0.3)))
priority = "high" if severity_score >= 0.72 else "medium" if severity_score >= 0.45 else "low"
gaps.append({
"topic": topic,
"priority": priority,
@@ -261,14 +268,18 @@ class StrategyArchitectAgent(SIFBaseAgent):
f"(density {competitor_density:.2f} vs {user_density:.2f})."
),
"confidence": round(confidence, 3),
"severity_score": round(severity_score, 3),
"coverage_delta": round(coverage_delta, 4),
"topic_density": {
"competitor": round(competitor_density, 4),
"user": round(user_density, 4),
"gap": round(density_gap, 4)
"gap": round(coverage_delta, 4)
},
"evidence": {
"competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic),
"user_sample_titles": self._sample_titles_for_topic(user_docs, topic),
"competitor_supporting_docs": competitor_support,
"user_supporting_docs": user_support,
"competitor_doc_count": len(competitor_docs),
"user_doc_count": len(user_docs)
}
@@ -276,6 +287,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
gaps.sort(
key=lambda item: (
item.get("severity_score", 0),
item.get("confidence", 0),
item.get("topic_density", {}).get("gap", 0)
),
@@ -355,25 +367,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
topic_counter: Counter = Counter()
for doc in documents:
metadata = doc.get("metadata", {})
candidates = []
for key in ("topics", "topic", "keywords", "keyword", "tags", "category"):
value = metadata.get(key)
if isinstance(value, list):
candidates.extend([str(v) for v in value if v])
elif isinstance(value, str) and value.strip():
candidates.extend(re.split(r"[,|/]", value))
title = metadata.get("title") or doc.get("text", "")[:120]
if title:
title_tokens = re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower())
candidates.extend(title_tokens)
normalized = {
item.strip().lower() for item in candidates
if item and len(item.strip()) >= 4 and not item.strip().isdigit()
}
for topic in normalized:
for topic in self._extract_topics_from_document(doc):
topic_counter[topic] += 1
total_docs = max(1, len(documents))
@@ -383,6 +377,63 @@ class StrategyArchitectAgent(SIFBaseAgent):
if count >= 2
}
def _infer_document_role(self, metadata: Dict[str, Any]) -> str:
"""Infer whether a document belongs to user content or competitor content."""
signals = [
metadata.get("type", ""),
metadata.get("doc_type", ""),
metadata.get("content_type", ""),
metadata.get("source", ""),
metadata.get("origin", "")
]
signal_blob = " ".join(str(item).lower() for item in signals if item)
if any(token in signal_blob for token in ("competitor", "rival", "market_peer")):
return "competitor"
if any(token in signal_blob for token in ("user", "owned", "first_party", "customer_site")):
return "user"
return "unknown"
def _extract_topics_from_document(self, doc: Dict[str, Any]) -> List[str]:
"""Extract normalized topic labels from metadata and lightweight text fields."""
metadata = doc.get("metadata", {})
candidates: List[str] = []
for key in ("topics", "topic", "themes", "theme", "keywords", "keyword", "tags", "category", "categories"):
value = metadata.get(key)
if isinstance(value, list):
candidates.extend([str(v) for v in value if v])
elif isinstance(value, str) and value.strip():
candidates.extend(re.split(r"[,|/]", value))
title = metadata.get("title") or doc.get("text", "")[:160]
if title:
candidates.extend(re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower()))
stopwords = {
"with", "from", "that", "this", "your", "about", "into", "using", "guide", "best",
"tips", "what", "when", "where", "how", "the", "and", "for", "2024", "2025"
}
normalized = {
item.strip().lower()
for item in candidates
if item
and len(item.strip()) >= 4
and not item.strip().isdigit()
and item.strip().lower() not in stopwords
}
return sorted(normalized)
def _map_topic_to_doc_titles(self, documents: List[Dict[str, Any]]) -> Dict[str, List[str]]:
"""Map each topic to a list of document titles that support it."""
mapping: Dict[str, List[str]] = {}
for doc in documents:
metadata = doc.get("metadata", {})
title = str(metadata.get("title") or doc.get("text", "")[:100] or "Untitled")
for topic in self._extract_topics_from_document(doc):
mapping.setdefault(topic, []).append(title)
return mapping
def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]:
"""Return sample titles for a topic."""
samples = []

View File

@@ -153,10 +153,16 @@ class SIFOnboardingIntegration:
content_pillars = await self.strategy_agent.discover_pillars()
# Find semantic gaps (what competitors cover that user doesn't)
semantic_gaps = await self.strategy_agent.find_semantic_gaps(competitor_indices=[])
indexed_documents = await self.strategy_agent._fetch_index_documents()
competitor_doc_ids = [
str(doc.get("id", ""))
for doc in indexed_documents
if self.strategy_agent._infer_document_role(doc.get("metadata", {})) == "competitor"
]
semantic_gaps = await self.strategy_agent.find_semantic_gaps(competitor_indices=competitor_doc_ids)
# Analyze content themes and topics
themes_analysis = await self._analyze_content_themes(user_content, competitor_content)
themes_analysis = await self._analyze_content_themes(indexed_documents)
# Generate strategic recommendations
recommendations = await self._generate_strategic_recommendations(
@@ -185,47 +191,65 @@ class SIFOnboardingIntegration:
"error": str(e)
}
async def _analyze_content_themes(self, user_content: List[Dict], competitor_content: List[Dict]) -> Optional[Dict[str, Any]]:
"""Analyze content themes and topics using semantic search."""
async def _analyze_content_themes(self, indexed_documents: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Analyze themes from indexed metadata instead of static literals."""
logger.info("[SIFOnboarding] Analyzing content themes")
try:
# Combine all content for theme analysis
all_content = user_content + competitor_content
if not all_content:
if not indexed_documents:
return None
# Extract key themes using semantic search
themes = []
theme_queries = [
"digital marketing strategies",
"content marketing best practices",
"SEO optimization techniques",
"social media marketing",
"email marketing campaigns",
"brand positioning and messaging"
user_docs = [
doc for doc in indexed_documents
if self.strategy_agent._infer_document_role(doc.get("metadata", {})) == "user"
]
for query in theme_queries:
results = await self.intelligence.search(query, limit=3)
if results:
themes.append({
"theme": query,
"relevance_score": results[0].get("score", 0) if results else 0,
"top_result": results[0] if results else None
})
# Sort themes by relevance
themes.sort(key=lambda x: x["relevance_score"], reverse=True)
competitor_docs = [
doc for doc in indexed_documents
if self.strategy_agent._infer_document_role(doc.get("metadata", {})) == "competitor"
]
if not user_docs and not competitor_docs:
return None
user_theme_density = self.strategy_agent._extract_topic_density(user_docs)
competitor_theme_density = self.strategy_agent._extract_topic_density(competitor_docs)
all_topics = set(user_theme_density) | set(competitor_theme_density)
ranked_themes = []
for topic in all_topics:
user_score = user_theme_density.get(topic, 0.0)
competitor_score = competitor_theme_density.get(topic, 0.0)
ranked_themes.append({
"theme": topic,
"user_density": round(user_score, 4),
"competitor_density": round(competitor_score, 4),
"combined_relevance": round((user_score + competitor_score) / 2, 4),
"coverage_delta": round(competitor_score - user_score, 4),
"classification": (
"competitor_led"
if competitor_score > user_score + 0.05
else "user_led"
if user_score > competitor_score + 0.05
else "shared"
),
"evidence": {
"user_sample_titles": self.strategy_agent._sample_titles_for_topic(user_docs, topic),
"competitor_sample_titles": self.strategy_agent._sample_titles_for_topic(competitor_docs, topic)
}
})
ranked_themes.sort(
key=lambda item: (item["combined_relevance"], abs(item["coverage_delta"])),
reverse=True
)
return {
"top_themes": themes[:5],
"total_themes_analyzed": len(themes),
"user_content_themes": [t for t in themes if any(t["theme"] in page.get("content", "") for page in user_content)],
"competitor_content_themes": [t for t in themes if any(t["theme"] in page.get("content", "") for page in competitor_content)]
"top_themes": ranked_themes[:8],
"total_themes_analyzed": len(ranked_themes),
"user_theme_count": len(user_theme_density),
"competitor_theme_count": len(competitor_theme_density),
"theme_source": "indexed_metadata"
}
except Exception as e:
logger.error(f"[SIFOnboarding] Theme analysis failed: {e}")
return None