Implement evidence-driven semantic gap detection

This commit is contained in:
ي
2026-03-01 21:58:44 +05:30
parent f8f7ddeb2a
commit 77088bfc53
4 changed files with 412 additions and 77 deletions

View File

@@ -7,6 +7,8 @@ Each agent leverages TxtaiIntelligenceService for semantic operations.
import traceback
import json
import asyncio
import re
from collections import Counter
from typing import List, Dict, Any, Optional
from datetime import datetime
from loguru import logger
@@ -177,23 +179,193 @@ class StrategyArchitectAgent(SIFBaseAgent):
self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices))
try:
# STUB: Implement cross-index comparison
# This would involve:
# 1. Getting user content topics/themes
# 2. Getting competitor content topics/themes
# 3. Finding topics competitors cover but user doesn't
logger.info(f"[{self.__class__.__name__}] Found semantic gaps analysis stub")
return [
{"topic": "Topic A", "priority": "high", "reason": "Competitor coverage gap"},
{"topic": "Topic B", "priority": "medium", "reason": "Emerging trend"}
]
documents = await self._fetch_index_documents()
if not documents:
logger.info(f"[{self.__class__.__name__}] No indexed documents available for gap detection")
return []
competitor_docs, user_docs = [], []
allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None
for doc in documents:
metadata = doc.get("metadata", {})
doc_type = str(metadata.get("type", "")).lower()
if "competitor" in doc_type:
if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids:
continue
competitor_docs.append(doc)
elif "user" in doc_type:
user_docs.append(doc)
if not competitor_docs or not user_docs:
logger.info(
f"[{self.__class__.__name__}] Insufficient split for gap analysis: "
f"user_docs={len(user_docs)}, competitor_docs={len(competitor_docs)}"
)
return []
competitor_topics = self._extract_topic_density(competitor_docs)
user_topics = self._extract_topic_density(user_docs)
gaps = []
for topic, competitor_density in competitor_topics.items():
user_density = user_topics.get(topic, 0.0)
density_gap = competitor_density - user_density
if density_gap <= 0.08:
continue
confidence = max(
0.0,
min(1.0, 0.35 + (density_gap * 1.5) + (competitor_density * 0.4))
)
priority = "high" if confidence >= 0.75 else "medium" if confidence >= 0.5 else "low"
gaps.append({
"topic": topic,
"priority": priority,
"reason": (
f"Competitors mention '{topic}' substantially more often "
f"(density {competitor_density:.2f} vs {user_density:.2f})."
),
"confidence": round(confidence, 3),
"topic_density": {
"competitor": round(competitor_density, 4),
"user": round(user_density, 4),
"gap": round(density_gap, 4)
},
"evidence": {
"competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic),
"user_sample_titles": self._sample_titles_for_topic(user_docs, topic),
"competitor_doc_count": len(competitor_docs),
"user_doc_count": len(user_docs)
}
})
gaps.sort(
key=lambda item: (
item.get("confidence", 0),
item.get("topic_density", {}).get("gap", 0)
),
reverse=True
)
return gaps[:12]
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to find semantic gaps: {e}")
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
return []
async def _fetch_index_documents(self) -> List[Dict[str, Any]]:
"""Fetch indexed documents and normalize metadata from txtai result objects."""
if not self.intelligence.is_initialized() or not self.intelligence.embeddings:
return []
embeddings = self.intelligence.embeddings
limit = 0
if hasattr(embeddings, "count"):
try:
limit = int(embeddings.count())
except Exception:
limit = 0
documents = []
candidate_queries = []
if limit > 0:
candidate_queries.extend([
f"select id, text, object from txtai limit {limit}",
f"select id, text, tags from txtai limit {limit}"
])
candidate_queries.extend(["marketing", "content", "seo", "strategy", "social media"])
seen_ids = set()
for query in candidate_queries:
try:
query_limit = limit if query.startswith("select") and limit > 0 else max(10, limit or 50)
rows = embeddings.search(query, limit=query_limit)
except Exception:
continue
for row in rows or []:
doc_id = str(row.get("id", ""))
dedupe_key = doc_id or str(hash(f"{row.get('text','')}::{row.get('score',0)}"))
if dedupe_key in seen_ids:
continue
seen_ids.add(dedupe_key)
documents.append({
"id": doc_id,
"text": row.get("text", "") or "",
"metadata": self._normalize_metadata(row)
})
if limit > 0 and len(documents) >= limit:
break
return documents
def _normalize_metadata(self, row: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize metadata payloads from txtai search rows."""
for key in ("object", "tags", "metadata", "meta"):
payload = row.get(key)
if isinstance(payload, dict):
return payload
if isinstance(payload, str):
try:
parsed = json.loads(payload)
if isinstance(parsed, dict):
return parsed
except Exception:
continue
return {}
def _extract_topic_density(self, documents: List[Dict[str, Any]]) -> Dict[str, float]:
"""Extract topic density from document metadata and titles."""
topic_counter: Counter = Counter()
for doc in documents:
metadata = doc.get("metadata", {})
candidates = []
for key in ("topics", "topic", "keywords", "keyword", "tags", "category"):
value = metadata.get(key)
if isinstance(value, list):
candidates.extend([str(v) for v in value if v])
elif isinstance(value, str) and value.strip():
candidates.extend(re.split(r"[,|/]", value))
title = metadata.get("title") or doc.get("text", "")[:120]
if title:
title_tokens = re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower())
candidates.extend(title_tokens)
normalized = {
item.strip().lower() for item in candidates
if item and len(item.strip()) >= 4 and not item.strip().isdigit()
}
for topic in normalized:
topic_counter[topic] += 1
total_docs = max(1, len(documents))
return {
topic: count / total_docs
for topic, count in topic_counter.items()
if count >= 2
}
def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]:
"""Return sample titles for a topic."""
samples = []
topic_lower = topic.lower()
for doc in documents:
metadata = doc.get("metadata", {})
title = metadata.get("title") or doc.get("text", "")[:100]
if not title:
continue
haystack = f"{title} {json.dumps(metadata, default=str)}".lower()
if topic_lower in haystack:
samples.append(str(title))
if len(samples) >= limit:
break
return samples
class ContentGuardianAgent(SIFBaseAgent):
"""Agent for preventing cannibalization and ensuring content originality."""
@@ -2436,4 +2608,3 @@ class SocialAmplificationAgent(BaseALwrityAgent):
"status": "scheduled",
"timestamp": datetime.utcnow().isoformat()
}