Implement evidence-based semantic gap detection for strategy agents

This commit is contained in:
ي
2026-03-02 11:42:52 +05:30
parent fe6d3b6c66
commit cd9ffb5ef5
3 changed files with 226 additions and 107 deletions

View File

@@ -174,7 +174,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
return proposals
async def find_semantic_gaps(self, competitor_indices: List[int]) -> List[Dict[str, Any]]:
async def find_semantic_gaps(self, competitor_indices: List[Any]) -> List[Dict[str, Any]]:
"""Compare user content vs competitor content to find missing topics."""
self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices))
@@ -186,14 +186,19 @@ class StrategyArchitectAgent(SIFBaseAgent):
competitor_docs, user_docs = [], []
allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None
if allowed_competitor_ids:
for idx in competitor_indices:
if isinstance(idx, int) and 0 <= idx < len(documents):
allowed_competitor_ids.add(str(documents[idx].get("id", "")))
for doc in documents:
metadata = doc.get("metadata", {})
doc_type = str(metadata.get("type", "")).lower()
if "competitor" in doc_type:
role = self._infer_document_role(metadata)
if role == "competitor":
if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids:
continue
competitor_docs.append(doc)
elif "user" in doc_type:
elif role == "user":
user_docs.append(doc)
if not competitor_docs or not user_docs:
@@ -203,28 +208,23 @@ class StrategyArchitectAgent(SIFBaseAgent):
)
return []
# FIX: Ensure we correctly map indices to documents if indices were passed as integers
# The filter allowed_competitor_ids uses str(idx) but if competitor_indices contained
# positional indices instead of IDs, we might have filtered everything out.
# In this implementation, we assume competitor_indices are doc IDs.
# If they are positional, we need a way to map them.
# For now, we trust the caller passed IDs.
competitor_topics = self._extract_topic_density(competitor_docs)
user_topics = self._extract_topic_density(user_docs)
competitor_topic_docs = self._map_topic_to_doc_titles(competitor_docs)
user_topic_docs = self._map_topic_to_doc_titles(user_docs)
gaps = []
for topic, competitor_density in competitor_topics.items():
user_density = user_topics.get(topic, 0.0)
density_gap = competitor_density - user_density
if density_gap <= 0.08:
coverage_delta = competitor_density - user_density
if coverage_delta <= 0.08:
continue
confidence = max(
0.0,
min(1.0, 0.35 + (density_gap * 1.5) + (competitor_density * 0.4))
)
priority = "high" if confidence >= 0.75 else "medium" if confidence >= 0.5 else "low"
competitor_support = len(competitor_topic_docs.get(topic, []))
user_support = len(user_topic_docs.get(topic, []))
confidence = max(0.0, min(1.0, (coverage_delta * 0.65) + (min(1.0, competitor_support / 4) * 0.35)))
severity_score = max(0.0, min(1.0, (coverage_delta * 0.7) + (confidence * 0.3)))
priority = "high" if severity_score >= 0.72 else "medium" if severity_score >= 0.45 else "low"
gaps.append({
"topic": topic,
"priority": priority,
@@ -233,14 +233,18 @@ class StrategyArchitectAgent(SIFBaseAgent):
f"(density {competitor_density:.2f} vs {user_density:.2f})."
),
"confidence": round(confidence, 3),
"severity_score": round(severity_score, 3),
"coverage_delta": round(coverage_delta, 4),
"topic_density": {
"competitor": round(competitor_density, 4),
"user": round(user_density, 4),
"gap": round(density_gap, 4)
"gap": round(coverage_delta, 4)
},
"evidence": {
"competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic),
"user_sample_titles": self._sample_titles_for_topic(user_docs, topic),
"competitor_supporting_docs": competitor_support,
"user_supporting_docs": user_support,
"competitor_doc_count": len(competitor_docs),
"user_doc_count": len(user_docs)
}
@@ -248,6 +252,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
gaps.sort(
key=lambda item: (
item.get("severity_score", 0),
item.get("confidence", 0),
item.get("topic_density", {}).get("gap", 0)
),
@@ -327,25 +332,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
topic_counter: Counter = Counter()
for doc in documents:
metadata = doc.get("metadata", {})
candidates = []
for key in ("topics", "topic", "keywords", "keyword", "tags", "category"):
value = metadata.get(key)
if isinstance(value, list):
candidates.extend([str(v) for v in value if v])
elif isinstance(value, str) and value.strip():
candidates.extend(re.split(r"[,|/]", value))
title = metadata.get("title") or doc.get("text", "")[:120]
if title:
title_tokens = re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower())
candidates.extend(title_tokens)
normalized = {
item.strip().lower() for item in candidates
if item and len(item.strip()) >= 4 and not item.strip().isdigit()
}
for topic in normalized:
for topic in self._extract_topics_from_document(doc):
topic_counter[topic] += 1
total_docs = max(1, len(documents))
@@ -355,6 +342,63 @@ class StrategyArchitectAgent(SIFBaseAgent):
if count >= 2
}
def _infer_document_role(self, metadata: Dict[str, Any]) -> str:
"""Infer whether a document belongs to user content or competitor content."""
signals = [
metadata.get("type", ""),
metadata.get("doc_type", ""),
metadata.get("content_type", ""),
metadata.get("source", ""),
metadata.get("origin", "")
]
signal_blob = " ".join(str(item).lower() for item in signals if item)
if any(token in signal_blob for token in ("competitor", "rival", "market_peer")):
return "competitor"
if any(token in signal_blob for token in ("user", "owned", "first_party", "customer_site")):
return "user"
return "unknown"
def _extract_topics_from_document(self, doc: Dict[str, Any]) -> List[str]:
"""Extract normalized topic labels from metadata and lightweight text fields."""
metadata = doc.get("metadata", {})
candidates: List[str] = []
for key in ("topics", "topic", "themes", "theme", "keywords", "keyword", "tags", "category", "categories"):
value = metadata.get(key)
if isinstance(value, list):
candidates.extend([str(v) for v in value if v])
elif isinstance(value, str) and value.strip():
candidates.extend(re.split(r"[,|/]", value))
title = metadata.get("title") or doc.get("text", "")[:160]
if title:
candidates.extend(re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower()))
stopwords = {
"with", "from", "that", "this", "your", "about", "into", "using", "guide", "best",
"tips", "what", "when", "where", "how", "the", "and", "for", "2024", "2025"
}
normalized = {
item.strip().lower()
for item in candidates
if item
and len(item.strip()) >= 4
and not item.strip().isdigit()
and item.strip().lower() not in stopwords
}
return sorted(normalized)
def _map_topic_to_doc_titles(self, documents: List[Dict[str, Any]]) -> Dict[str, List[str]]:
"""Map each topic to a list of document titles that support it."""
mapping: Dict[str, List[str]] = {}
for doc in documents:
metadata = doc.get("metadata", {})
title = str(metadata.get("title") or doc.get("text", "")[:100] or "Untitled")
for topic in self._extract_topics_from_document(doc):
mapping.setdefault(topic, []).append(title)
return mapping
def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]:
"""Return sample titles for a topic."""
samples = []

View File

@@ -209,7 +209,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
# Simple confidence based on cluster size - larger clusters are more reliable
return min(1.0, len(cluster_indices) / 10.0)
async def find_semantic_gaps(self, competitor_indices: List[int]) -> List[Dict[str, Any]]:
async def find_semantic_gaps(self, competitor_indices: List[Any]) -> List[Dict[str, Any]]:
"""Compare user content vs competitor content to find missing topics."""
self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices))
@@ -221,14 +221,19 @@ class StrategyArchitectAgent(SIFBaseAgent):
competitor_docs, user_docs = [], []
allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None
if allowed_competitor_ids:
for idx in competitor_indices:
if isinstance(idx, int) and 0 <= idx < len(documents):
allowed_competitor_ids.add(str(documents[idx].get("id", "")))
for doc in documents:
metadata = doc.get("metadata", {})
doc_type = str(metadata.get("type", "")).lower()
if "competitor" in doc_type:
role = self._infer_document_role(metadata)
if role == "competitor":
if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids:
continue
competitor_docs.append(doc)
elif "user" in doc_type:
elif role == "user":
user_docs.append(doc)
if not competitor_docs or not user_docs:
@@ -240,19 +245,21 @@ class StrategyArchitectAgent(SIFBaseAgent):
competitor_topics = self._extract_topic_density(competitor_docs)
user_topics = self._extract_topic_density(user_docs)
competitor_topic_docs = self._map_topic_to_doc_titles(competitor_docs)
user_topic_docs = self._map_topic_to_doc_titles(user_docs)
gaps = []
for topic, competitor_density in competitor_topics.items():
user_density = user_topics.get(topic, 0.0)
density_gap = competitor_density - user_density
if density_gap <= 0.08:
coverage_delta = competitor_density - user_density
if coverage_delta <= 0.08:
continue
confidence = max(
0.0,
min(1.0, 0.35 + (density_gap * 1.5) + (competitor_density * 0.4))
)
priority = "high" if confidence >= 0.75 else "medium" if confidence >= 0.5 else "low"
competitor_support = len(competitor_topic_docs.get(topic, []))
user_support = len(user_topic_docs.get(topic, []))
confidence = max(0.0, min(1.0, (coverage_delta * 0.65) + (min(1.0, competitor_support / 4) * 0.35)))
severity_score = max(0.0, min(1.0, (coverage_delta * 0.7) + (confidence * 0.3)))
priority = "high" if severity_score >= 0.72 else "medium" if severity_score >= 0.45 else "low"
gaps.append({
"topic": topic,
"priority": priority,
@@ -261,14 +268,18 @@ class StrategyArchitectAgent(SIFBaseAgent):
f"(density {competitor_density:.2f} vs {user_density:.2f})."
),
"confidence": round(confidence, 3),
"severity_score": round(severity_score, 3),
"coverage_delta": round(coverage_delta, 4),
"topic_density": {
"competitor": round(competitor_density, 4),
"user": round(user_density, 4),
"gap": round(density_gap, 4)
"gap": round(coverage_delta, 4)
},
"evidence": {
"competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic),
"user_sample_titles": self._sample_titles_for_topic(user_docs, topic),
"competitor_supporting_docs": competitor_support,
"user_supporting_docs": user_support,
"competitor_doc_count": len(competitor_docs),
"user_doc_count": len(user_docs)
}
@@ -276,6 +287,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
gaps.sort(
key=lambda item: (
item.get("severity_score", 0),
item.get("confidence", 0),
item.get("topic_density", {}).get("gap", 0)
),
@@ -355,25 +367,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
topic_counter: Counter = Counter()
for doc in documents:
metadata = doc.get("metadata", {})
candidates = []
for key in ("topics", "topic", "keywords", "keyword", "tags", "category"):
value = metadata.get(key)
if isinstance(value, list):
candidates.extend([str(v) for v in value if v])
elif isinstance(value, str) and value.strip():
candidates.extend(re.split(r"[,|/]", value))
title = metadata.get("title") or doc.get("text", "")[:120]
if title:
title_tokens = re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower())
candidates.extend(title_tokens)
normalized = {
item.strip().lower() for item in candidates
if item and len(item.strip()) >= 4 and not item.strip().isdigit()
}
for topic in normalized:
for topic in self._extract_topics_from_document(doc):
topic_counter[topic] += 1
total_docs = max(1, len(documents))
@@ -383,6 +377,63 @@ class StrategyArchitectAgent(SIFBaseAgent):
if count >= 2
}
def _infer_document_role(self, metadata: Dict[str, Any]) -> str:
"""Infer whether a document belongs to user content or competitor content."""
signals = [
metadata.get("type", ""),
metadata.get("doc_type", ""),
metadata.get("content_type", ""),
metadata.get("source", ""),
metadata.get("origin", "")
]
signal_blob = " ".join(str(item).lower() for item in signals if item)
if any(token in signal_blob for token in ("competitor", "rival", "market_peer")):
return "competitor"
if any(token in signal_blob for token in ("user", "owned", "first_party", "customer_site")):
return "user"
return "unknown"
def _extract_topics_from_document(self, doc: Dict[str, Any]) -> List[str]:
"""Extract normalized topic labels from metadata and lightweight text fields."""
metadata = doc.get("metadata", {})
candidates: List[str] = []
for key in ("topics", "topic", "themes", "theme", "keywords", "keyword", "tags", "category", "categories"):
value = metadata.get(key)
if isinstance(value, list):
candidates.extend([str(v) for v in value if v])
elif isinstance(value, str) and value.strip():
candidates.extend(re.split(r"[,|/]", value))
title = metadata.get("title") or doc.get("text", "")[:160]
if title:
candidates.extend(re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower()))
stopwords = {
"with", "from", "that", "this", "your", "about", "into", "using", "guide", "best",
"tips", "what", "when", "where", "how", "the", "and", "for", "2024", "2025"
}
normalized = {
item.strip().lower()
for item in candidates
if item
and len(item.strip()) >= 4
and not item.strip().isdigit()
and item.strip().lower() not in stopwords
}
return sorted(normalized)
def _map_topic_to_doc_titles(self, documents: List[Dict[str, Any]]) -> Dict[str, List[str]]:
"""Map each topic to a list of document titles that support it."""
mapping: Dict[str, List[str]] = {}
for doc in documents:
metadata = doc.get("metadata", {})
title = str(metadata.get("title") or doc.get("text", "")[:100] or "Untitled")
for topic in self._extract_topics_from_document(doc):
mapping.setdefault(topic, []).append(title)
return mapping
def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]:
"""Return sample titles for a topic."""
samples = []