Files
ALwrity/backend/services/intelligence/sif_agents.py
ajaysi 923fa671fe feat: ContentGuardianAgent, onboarding UX, Team Activity action wiring, docs, agent help modal
ContentGuardianAgent consolidation:
- Merge 3 duplicate classes into single source in specialized/content_guardian.py
- Watchdog audit_committee() with heuristic scoring, coverage gaps, overlaps, alerts
- Remove misleading rejection_rate() helper; use acceptance_rate directly
- Integrate audit + alerts + trend signals into today_workflow_service.py

Team Activity page:
- QualityAuditPanel: health ring, per-agent critiques, coverage gaps, overlaps
- TrendSignalsPanel: opportunity cards with urgency/impact/coverage bars
- AlertBanner: persistent dismiss via POST /alerts/{id}/mark-read
- AgentHelpModal: dialog showing all 8 agents with descriptions, tools, schedule
- QualityAuditPanel action buttons: Fill gap -> /content-planning, Resolve overlap, View CTA on alerts/issues
- TrendSignalsPanel action buttons: Create content from this trend -> /blog-writer with trend context state

Onboarding system:
- Step 4 validation: no auto-pass via basic_ready; requires persona data or explicit progression
- Step 5 validation: logs warning on auto-pass without integration data
- OnboardingCompletionService: single DB session, transactional task creation, upsert pattern
- Business-without-website: nullable website_url on SIFIndexingTask and MarketTrendsTask
- DeepCompetitorAnalysisExecutor: 5-min timeout, 10-competitor cap, asyncio.wait_for
- Persona generation: async with 30s timeout, falls back to scheduler
- OnboardingProgressService.reset_onboarding(): resets session + pauses all DB tasks
- OnboardingControlService.reset_onboarding(): also cancels APScheduler jobs
- FinalStep TaskSchedulingPanel: shows scheduled/failed tasks after completion, 8s auto-redirect
- onboarding_completed agent activity event logged to feed

Documentation:
- docs-site/features/onboarding/: overview, steps, scheduler-tasks, technical-reference (4 pages)
- docs-site/mkdocs.yml: added Onboarding System nav section
- docs-site/features/sif-agents/: overview, agent-directory, committee-system, content-guardian (4 pages)
- docs-site/features/team-activity/: overview, quality-audit, trend-signals, alert-system (4 pages)
- docs-site/features/todays-workflow/: updated overview, technical-architecture, workflow-guide, api-reference
2026-06-01 12:24:31 +05:30

879 lines
37 KiB
Python

"""
SIF Agent Interfaces
Defines the specialized agents for digital marketing and SEO.
Each agent leverages TxtaiIntelligenceService for semantic operations.
"""
import traceback
import json
import asyncio
import re
from collections import Counter
from typing import List, Dict, Any, Optional
from datetime import datetime
from loguru import logger
from .txtai_service import TxtaiIntelligenceService, TXTAI_AVAILABLE
from services.intelligence.agents.core_agent_framework import BaseALwrityAgent
from services.llm_providers.main_text_generation import llm_text_gen
# Optional txtai imports (align with core agent framework)
try:
from txtai import Agent, LLM
except ImportError:
Agent = None
LLM = None
class SharedLLMWrapper:
"""Wraps the shared ALwrity LLM service to look like a txtai LLM."""
def __init__(self, user_id: str, task: Optional[str] = None):
self.user_id = user_id
self.task = task
def generate(self, prompt: str, **kwargs) -> str:
"""Generate text using the shared LLM provider."""
try:
return llm_text_gen(
prompt,
user_id=self.user_id,
preferred_hf_models=LOW_COST_SHARED_REMOTE_MODELS,
flow_type="sif_agent",
)
except Exception as e:
logger.error(f"SharedLLMWrapper failed to generate text: {e}")
return f"[ERROR: Shared LLM generation failed for user {self.user_id}]"
def __call__(self, prompt: str, **kwargs) -> str:
return self.generate(prompt, **kwargs)
_local_llm_cache = {}
LOW_COST_SHARED_REMOTE_MODELS = [
"Qwen/Qwen2.5-1.5B-Instruct",
"Qwen/Qwen2.5-0.5B-Instruct",
"TinyLlama/TinyLlama-1.1B-Chat-v1.0",
]
LOCAL_LLM_FALLBACKS = [
"Qwen/Qwen2.5-1.5B-Instruct",
"Qwen/Qwen2.5-0.5B-Instruct",
"TinyLlama/TinyLlama-1.1B-Chat-v1.0",
]
class LocalLLMWrapper:
"""
Wraps a local LLM with async lifecycle support.
Model loading runs off the event loop so it never blocks the server.
Loaded models are cached globally (shared across all instances).
"""
def __init__(self, model_path: str, task: str = None):
self.model_path = model_path
self.task = task
self._initialized = False
self._init_task = None
def _load_model_sync(self) -> Any:
"""Load model (blocking — call via thread executor from async code)."""
cache_key = f"{self.model_path}:{self.task}"
if cache_key in _local_llm_cache:
return _local_llm_cache[cache_key]
if LLM is None:
raise ImportError("txtai.pipeline.LLM is not available")
task_to_use = (self.task or "language-generation").strip()
if any(x in self.model_path for x in ["Qwen", "Instruct", "GPT", "Llama"]):
task_to_use = "language-generation"
if task_to_use == "text-generation":
task_to_use = "language-generation"
candidate_models = []
for candidate in [self.model_path, *LOCAL_LLM_FALLBACKS]:
if candidate not in candidate_models:
candidate_models.append(candidate)
last_error = None
for candidate_model in candidate_models:
candidate_key = f"{candidate_model}:{self.task}"
if candidate_key in _local_llm_cache:
if candidate_model != self.model_path:
logger.warning(f"Using cached fallback local LLM model: {candidate_model}")
return _local_llm_cache[candidate_key]
logger.info(f"Loading local LLM (singleton): {candidate_model} (task={task_to_use})")
try:
_local_llm_cache[candidate_key] = LLM(path=candidate_model, task=task_to_use)
if candidate_model != self.model_path:
logger.warning(
f"Loaded fallback local LLM model '{candidate_model}' after failure on '{self.model_path}'"
)
return _local_llm_cache[candidate_key]
except Exception as e:
last_error = e
message = str(e).lower()
is_memory_issue = (
"paging file is too small" in message
or "os error 1455" in message
or "out of memory" in message
or "not enough memory" in message
)
if is_memory_issue:
logger.warning(
f"Local LLM memory load failure for '{candidate_model}', trying smaller fallback. Error: {e}"
)
continue
logger.warning(f"Local LLM load failed for '{candidate_model}', trying next fallback. Error: {e}")
continue
try:
import transformers
from transformers.pipelines import SUPPORTED_TASKS
logger.error(
f"LocalLLMWrapper init failed (model={self.model_path}, requested_task={task_to_use}, "
f"transformers={getattr(transformers, '__version__', 'unknown')}, "
f"supported_tasks={sorted(list(SUPPORTED_TASKS.keys()))[:50]})"
)
except Exception:
pass
logger.error(f"Failed to initialize LocalLLMWrapper after fallback attempts: {last_error}")
raise last_error
@property
def llm(self):
"""Sync accessor — lazy loads via global cache. Blocks on first call."""
cache_key = f"{self.model_path}:{self.task}"
if cache_key in _local_llm_cache:
return _local_llm_cache[cache_key]
result = self._load_model_sync()
self._initialized = True
return result
async def initialize(self) -> bool:
"""Pre-load model asynchronously. Call at server startup to avoid first-request delay."""
if self._initialized:
return True
cache_key = f"{self.model_path}:{self.task}"
if cache_key in _local_llm_cache:
self._initialized = True
return True
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._load_model_sync)
self._initialized = True
return True
except Exception as e:
logger.error(f"[LocalLLMWrapper] Async init failed for {self.model_path}: {e}")
return False
async def ensure_initialized_async(self) -> bool:
"""Public async hook — ensures model is loaded without blocking the event loop."""
if self._initialized:
return True
return await self.initialize()
async def shutdown(self):
"""Release model resources."""
cache_key = f"{self.model_path}:{self.task}"
_local_llm_cache.pop(cache_key, None)
self._initialized = False
def __call__(self, prompt: str, **kwargs) -> str:
return self.llm(prompt, **kwargs)
def generate(self, prompt: str, **kwargs) -> str:
return self.llm(prompt, **kwargs)
class SIFBaseAgent(BaseALwrityAgent):
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, agent_type: str = "sif_agent", model_name: str = "Qwen/Qwen2.5-1.5B-Instruct", llm: Any = None):
# Hybrid LLM Strategy:
# 1. Shared LLM for external/high-quality generation (available to all agents)
self.shared_llm = SharedLLMWrapper(user_id)
# 2. Local LLM for internal agent work (default for SIF agents)
if llm is None:
if not (TXTAI_AVAILABLE and LLM is not None):
raise RuntimeError("txtai LLM is required for SIF agents but is not available")
llm = LocalLLMWrapper(model_name, task="text-generation")
super().__init__(user_id, agent_type, model_name, llm)
self.intelligence = intelligence_service
def _log_agent_operation(self, operation: str, **kwargs):
"""Standardized logging for agent operations."""
logger.info(f"[{self.__class__.__name__}] {operation}")
if kwargs:
logger.debug(f"[{self.__class__.__name__}] Parameters: {kwargs}")
async def _ensure_intelligence_ready(self) -> bool:
"""Ensure txtai intelligence service is initialized without blocking the event loop."""
try:
await self.intelligence._ensure_initialized_async()
except Exception as init_err:
logger.warning(f"[{self.__class__.__name__}] Intelligence initialization failed: {init_err}")
return False
return bool(getattr(self.intelligence, "_initialized", False) and self.intelligence.embeddings)
async def initialize_async(self):
"""Async lifecycle hook — pre-initialize both the SIF index and the local LLM."""
await self._ensure_intelligence_ready()
llm = getattr(self, "llm", None)
if hasattr(llm, "ensure_initialized_async"):
await llm.ensure_initialized_async()
logger.info(f"[{self.__class__.__name__}] Async initialization complete")
async def shutdown(self):
"""Async lifecycle hook — release model resources."""
llm = getattr(self, "llm", None)
if hasattr(llm, "shutdown"):
await llm.shutdown()
logger.info(f"[{self.__class__.__name__}] Shutdown complete")
def _create_txtai_agent(self):
"""
SIF agents primarily use the intelligence service directly, but we can expose
capabilities via a standard agent interface if available.
"""
if not TXTAI_AVAILABLE or Agent is None:
raise RuntimeError(f"[{self.__class__.__name__}] txtai Agent not available")
try:
_llm_for_agent = self.llm
for _ in range(3):
_llm_for_agent = getattr(_llm_for_agent, "llm", _llm_for_agent)
return Agent(llm=_llm_for_agent, tools=[])
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to create txtai Agent: {e}")
raise
class StrategyArchitectAgent(SIFBaseAgent):
"""Agent for discovering content pillars and identifying strategic gaps."""
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str):
super().__init__(intelligence_service, user_id, agent_type="strategy_architect")
async def discover_pillars(self) -> List[Dict[str, Any]]:
"""Identify content pillars through semantic clustering."""
self._log_agent_operation("Discovering content pillars")
try:
# Let intelligence service perform lazy async initialization internally.
clusters = await self.intelligence.cluster(min_score=0.6)
if not clusters:
logger.warning(f"[{self.__class__.__name__}] No clusters found")
return []
# Create pillar objects with metadata
pillars = []
for i, cluster_indices in enumerate(clusters):
pillar = {
"pillar_id": f"pillar_{i}",
"indices": cluster_indices,
"size": len(cluster_indices),
"confidence": self._calculate_cluster_confidence(cluster_indices)
}
pillars.append(pillar)
logger.debug(f"[{self.__class__.__name__}] Created pillar {pillar['pillar_id']} with {pillar['size']} items")
logger.info(f"[{self.__class__.__name__}] Discovered {len(pillars)} content pillars")
return pillars
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to discover pillars: {e}")
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
return []
async def analyze_content_strategy(self, website_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Analyze content strategy based on website data and semantic insights.
Args:
website_data: Dictionary containing website analysis data
Returns:
List of strategic recommendations
"""
self._log_agent_operation("Analyzing content strategy")
try:
recommendations = []
# 1. Discover existing pillars
pillars = await self.discover_pillars()
# 2. Analyze gaps based on pillars (simplified logic for now)
if not pillars:
recommendations.append({
"type": "strategy_gap",
"priority": "high",
"title": "Establish Core Content Pillars",
"description": "No clear content clusters found. Focus on defining 3-5 core topics to build authority."
})
else:
# Suggest strengthening weak pillars
for pillar in pillars:
if pillar['size'] < 3:
recommendations.append({
"type": "content_depth",
"priority": "medium",
"title": f"Strengthen Pillar {pillar['pillar_id']}",
"description": "This topic cluster has few articles. Create more content to establish authority.",
"pillar_id": pillar['pillar_id']
})
# 3. Add generic recommendations based on website data if available
if website_data:
if not website_data.get('description'):
recommendations.append({
"type": "metadata",
"priority": "high",
"title": "Missing Meta Description",
"description": "Website is missing a meta description. Add one to improve SEO CTR."
})
logger.info(f"[{self.__class__.__name__}] Generated {len(recommendations)} strategic recommendations")
return recommendations
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to analyze content strategy: {e}")
return []
def _calculate_cluster_confidence(self, cluster_indices: List[int]) -> float:
"""Calculate confidence score for a cluster based on its size and coherence."""
# Simple confidence based on cluster size - larger clusters are more reliable
return min(1.0, len(cluster_indices) / 10.0)
async def find_semantic_gaps(self, competitor_indices: List[Any]) -> List[Dict[str, Any]]:
"""Compare user content vs competitor content to find missing topics."""
self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices))
try:
documents = await self._fetch_index_documents()
if not documents:
logger.info(f"[{self.__class__.__name__}] No indexed documents available for gap detection")
return []
competitor_docs, user_docs = [], []
allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None
if allowed_competitor_ids:
for idx in competitor_indices:
if isinstance(idx, int) and 0 <= idx < len(documents):
allowed_competitor_ids.add(str(documents[idx].get("id", "")))
for doc in documents:
metadata = doc.get("metadata", {})
role = self._infer_document_role(metadata)
if role == "competitor":
if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids:
continue
competitor_docs.append(doc)
elif role == "user":
user_docs.append(doc)
if not competitor_docs or not user_docs:
logger.info(
f"[{self.__class__.__name__}] Insufficient split for gap analysis: "
f"user_docs={len(user_docs)}, competitor_docs={len(competitor_docs)}"
)
return []
competitor_topics = self._extract_topic_density(competitor_docs)
user_topics = self._extract_topic_density(user_docs)
competitor_topic_docs = self._map_topic_to_doc_titles(competitor_docs)
user_topic_docs = self._map_topic_to_doc_titles(user_docs)
gaps = []
for topic, competitor_density in competitor_topics.items():
user_density = user_topics.get(topic, 0.0)
coverage_delta = competitor_density - user_density
if coverage_delta <= 0.08:
continue
competitor_support = len(competitor_topic_docs.get(topic, []))
user_support = len(user_topic_docs.get(topic, []))
confidence = max(0.0, min(1.0, (coverage_delta * 0.65) + (min(1.0, competitor_support / 4) * 0.35)))
severity_score = max(0.0, min(1.0, (coverage_delta * 0.7) + (confidence * 0.3)))
priority = "high" if severity_score >= 0.72 else "medium" if severity_score >= 0.45 else "low"
gaps.append({
"topic": topic,
"priority": priority,
"reason": (
f"Competitors mention '{topic}' substantially more often "
f"(density {competitor_density:.2f} vs {user_density:.2f})."
),
"confidence": round(confidence, 3),
"severity_score": round(severity_score, 3),
"coverage_delta": round(coverage_delta, 4),
"topic_density": {
"competitor": round(competitor_density, 4),
"user": round(user_density, 4),
"gap": round(coverage_delta, 4)
},
"evidence": {
"competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic),
"user_sample_titles": self._sample_titles_for_topic(user_docs, topic),
"competitor_supporting_docs": competitor_support,
"user_supporting_docs": user_support,
"competitor_doc_count": len(competitor_docs),
"user_doc_count": len(user_docs)
}
})
gaps.sort(
key=lambda item: (
item.get("severity_score", 0),
item.get("confidence", 0),
item.get("topic_density", {}).get("gap", 0)
),
reverse=True
)
return gaps[:12]
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to find semantic gaps: {e}")
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
return []
async def _fetch_index_documents(self) -> List[Dict[str, Any]]:
"""Fetch indexed documents and normalize metadata from txtai result objects."""
if not await self._ensure_intelligence_ready():
return []
embeddings = self.intelligence.embeddings
limit = 0
if hasattr(embeddings, "count"):
try:
limit = int(await asyncio.to_thread(embeddings.count))
except Exception:
limit = 0
documents = []
candidate_queries = []
if limit > 0:
candidate_queries.extend([
f"select id, text, object from txtai limit {limit}",
f"select id, text, tags from txtai limit {limit}"
])
candidate_queries.extend(["marketing", "content", "seo", "strategy", "social media"])
seen_ids = set()
for query in candidate_queries:
try:
query_limit = limit if query.startswith("select") and limit > 0 else max(10, limit or 50)
rows = await asyncio.to_thread(lambda: embeddings.search(query, limit=query_limit))
except Exception:
continue
for row in rows or []:
doc_id = str(row.get("id", ""))
dedupe_key = doc_id or str(hash(f"{row.get('text','')}::{row.get('score',0)}"))
if dedupe_key in seen_ids:
continue
seen_ids.add(dedupe_key)
documents.append({
"id": doc_id,
"text": row.get("text", "") or "",
"metadata": self._normalize_metadata(row)
})
if limit > 0 and len(documents) >= limit:
break
return documents
def _normalize_metadata(self, row: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize metadata payloads from txtai search rows."""
for key in ("object", "tags", "metadata", "meta"):
payload = row.get(key)
if isinstance(payload, dict):
return payload
if isinstance(payload, str):
try:
parsed = json.loads(payload)
if isinstance(parsed, dict):
return parsed
except Exception:
continue
return {}
def _extract_topic_density(self, documents: List[Dict[str, Any]]) -> Dict[str, float]:
"""Extract topic density from document metadata and titles."""
topic_counter: Counter = Counter()
for doc in documents:
for topic in self._extract_topics_from_document(doc):
topic_counter[topic] += 1
total_docs = max(1, len(documents))
return {
topic: count / total_docs
for topic, count in topic_counter.items()
if count >= 2
}
def _infer_document_role(self, metadata: Dict[str, Any]) -> str:
"""Infer whether a document belongs to user content or competitor content."""
signals = [
metadata.get("type", ""),
metadata.get("doc_type", ""),
metadata.get("content_type", ""),
metadata.get("source", ""),
metadata.get("origin", "")
]
signal_blob = " ".join(str(item).lower() for item in signals if item)
if any(token in signal_blob for token in ("competitor", "rival", "market_peer")):
return "competitor"
if any(token in signal_blob for token in ("user", "owned", "first_party", "customer_site")):
return "user"
return "unknown"
def _extract_topics_from_document(self, doc: Dict[str, Any]) -> List[str]:
"""Extract normalized topic labels from metadata and lightweight text fields."""
metadata = doc.get("metadata", {})
candidates: List[str] = []
for key in ("topics", "topic", "themes", "theme", "keywords", "keyword", "tags", "category", "categories"):
value = metadata.get(key)
if isinstance(value, list):
candidates.extend([str(v) for v in value if v])
elif isinstance(value, str) and value.strip():
candidates.extend(re.split(r"[,|/]", value))
title = metadata.get("title") or doc.get("text", "")[:160]
if title:
candidates.extend(re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower()))
stopwords = {
"with", "from", "that", "this", "your", "about", "into", "using", "guide", "best",
"tips", "what", "when", "where", "how", "the", "and", "for", "2024", "2025"
}
normalized = {
item.strip().lower()
for item in candidates
if item
and len(item.strip()) >= 4
and not item.strip().isdigit()
and item.strip().lower() not in stopwords
}
return sorted(normalized)
def _map_topic_to_doc_titles(self, documents: List[Dict[str, Any]]) -> Dict[str, List[str]]:
"""Map each topic to a list of document titles that support it."""
mapping: Dict[str, List[str]] = {}
for doc in documents:
metadata = doc.get("metadata", {})
title = str(metadata.get("title") or doc.get("text", "")[:100] or "Untitled")
for topic in self._extract_topics_from_document(doc):
mapping.setdefault(topic, []).append(title)
return mapping
def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]:
"""Return sample titles for a topic."""
samples = []
topic_lower = topic.lower()
for doc in documents:
metadata = doc.get("metadata", {})
title = metadata.get("title") or doc.get("text", "")[:100]
if not title:
continue
haystack = f"{title} {json.dumps(metadata, default=str)}".lower()
if topic_lower in haystack:
samples.append(str(title))
if len(samples) >= limit:
break
return samples
class LinkGraphAgent(SIFBaseAgent):
"""
Agent for internal link suggestions, graph management, and authority analysis.
Implements the semantic link graph using SIF and GSC/Bing data.
"""
RELEVANCE_THRESHOLD = 0.6 # Minimum relevance score for link suggestions
MAX_SUGGESTIONS = 10 # Maximum number of link suggestions
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, sif_service: Any = None):
super().__init__(intelligence_service, user_id, agent_type="link_graph")
self.sif_service = sif_service
async def suggest_internal_links(self, draft: str) -> List[Dict[str, Any]]:
"""Suggest internal links based on semantic proximity and authority."""
return await self.link_suggester(draft)
async def link_suggester(self, draft: str) -> List[Dict[str, Any]]:
"""
Tool: Suggests internal links.
Analyzes draft content and finds semantically relevant pages, boosted by authority.
"""
self._log_agent_operation("Suggesting internal links", draft_length=len(draft))
try:
if not await self._ensure_intelligence_ready():
logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized")
return []
if not draft or len(draft.strip()) < 50: # Reduced threshold for testing
logger.warning(f"[{self.__class__.__name__}] Draft too short for meaningful link suggestions")
return []
# 1. Get Semantic Candidates
results = await self.intelligence.search(draft, limit=self.MAX_SUGGESTIONS)
if not results:
logger.info(f"[{self.__class__.__name__}] No relevant internal pages found")
return []
suggestions = []
for result in results:
relevance_score = result.get('score', 0.0)
url = result.get('id', 'unknown')
if relevance_score >= self.RELEVANCE_THRESHOLD:
suggestion = {
"url": url,
"relevance": relevance_score,
"final_score": relevance_score,
"confidence": self._calculate_link_confidence(relevance_score),
"reason": f"Semantic similarity: {relevance_score:.3f}"
}
suggestions.append(suggestion)
logger.debug(f"[{self.__class__.__name__}] Added link suggestion: {url} (score: {relevance_score:.3f})")
# Sort by final score
suggestions.sort(key=lambda x: x['final_score'], reverse=True)
logger.info(f"[{self.__class__.__name__}] Generated {len(suggestions)} internal link suggestions")
return suggestions
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to suggest internal links: {e}")
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
return []
async def graph_builder(self) -> Dict[str, Any]:
"""
Tool: Builds/Visualizes the semantic link graph.
Returns the structure of the graph (nodes and edges) for visualization or analysis.
"""
self._log_agent_operation("Building semantic link graph")
try:
if not await self._ensure_intelligence_ready():
return {"error": "Intelligence service not initialized"}
# This is a resource-intensive operation in a real vector DB.
# Here we simulate the graph structure based on recent content or clusters.
# 1. Get Clusters (Nodes)
clusters = await self.intelligence.cluster(min_score=0.5)
nodes = []
edges = []
for i, cluster in enumerate(clusters):
cluster_id = f"cluster_{i}"
nodes.append({
"id": cluster_id,
"type": "topic_cluster",
"size": len(cluster)
})
# Add content items as nodes linked to cluster
for item_idx in cluster:
# We need to retrieve item metadata.
# txtai cluster returns indices. We might need to query by index or ID.
# For this implementation, we'll return a simplified view.
pass
return {
"graph_stats": {
"total_clusters": len(clusters),
"total_nodes": sum(len(c) for c in clusters)
},
"structure": "hierarchical", # vs flat
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to build graph: {e}")
return {"error": str(e)}
async def authority_analyzer(self, target_url: Optional[str] = None) -> Dict[str, Any]:
"""
Tool: Analyzes the authority of the site or specific pages using GSC/Bing data.
"""
self._log_agent_operation("Analyzing authority", target_url=target_url)
if not self.sif_service:
return {"error": "SIF Service unavailable for authority analysis"}
try:
# 1. Get Dashboard Context
context = await self.sif_service.get_seo_dashboard_context()
if "error" in context:
return context
data = context.get("dashboard_data", {})
summary = data.get("summary", {})
health = data.get("health_score", {})
# 2. Extract Authority Metrics
authority_report = {
"domain_authority_proxy": {
"health_score": health.get("score"),
"total_clicks": summary.get("clicks"),
"avg_position": summary.get("position")
},
"page_authority": "Page-level authority requires granular GSC data (Planned)", # Placeholder
"timestamp": datetime.utcnow().isoformat()
}
return authority_report
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Authority analysis failed: {e}")
return {"error": str(e)}
def _calculate_link_confidence(self, relevance_score: float) -> float:
"""Calculate confidence score for a link suggestion."""
# Simple confidence based on relevance score
return min(1.0, relevance_score * 1.5)
async def optimize_anchor_text(self, target_url: str, context: str) -> str:
"""Suggest anchor text for a link by searching the SIF index for the target page."""
self._log_agent_operation("Optimizing anchor text", target_url=target_url, context_length=len(context))
try:
if not await self._ensure_intelligence_ready():
return self._extract_anchor_from_context(target_url, context)
results = await self.intelligence.search(f"{target_url} {context}", limit=3)
if results:
text = results[0].get("text", "") or results[0].get("id", "")
words = [w for w in text.split() if len(w) > 4][:5]
if words:
return " ".join(words)
return self._extract_anchor_from_context(target_url, context)
except Exception as e:
logger.error(f"[{self.__class__.__name__}] optimize_anchor_text failed: {e}")
return self._extract_anchor_from_context(target_url, context)
def _extract_anchor_from_context(self, target_url: str, context: str) -> str:
"""Extract a usable anchor text from the URL or context when SIF is unavailable."""
from urllib.parse import urlparse
try:
parsed = urlparse(target_url)
path = parsed.path.strip("/").replace("-", " ").replace("/", " ")
if path:
words = [w for w in path.split() if len(w) > 3]
if words:
return " ".join(words[:4]).title()
except Exception:
pass
words = [w for w in context.split() if len(w) > 4]
return " ".join(words[:4]).title() if words else "learn more"
class CitationExpert(SIFBaseAgent):
"""
Agent for fact-checking, citation generation, and evidence verification.
"""
EVIDENCE_THRESHOLD = 0.7 # Minimum relevance score for evidence
MAX_EVIDENCE = 5 # Maximum number of evidence pieces to return
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str):
super().__init__(intelligence_service, user_id, agent_type="citation_expert")
async def fact_checker(self, claim: str) -> List[Dict[str, Any]]:
"""
Tool: Verifies facts against trusted research data.
Returns supporting or contradicting evidence.
"""
return await self.verify_facts(claim)
async def citation_finder(self, topic: str) -> List[Dict[str, Any]]:
"""
Tool: Suggests authoritative citations for a given topic.
"""
self._log_agent_operation("Finding citations", topic=topic)
try:
if not await self._ensure_intelligence_ready():
return []
# Search for highly relevant content
results = await self.intelligence.search(topic, limit=self.MAX_EVIDENCE)
citations = []
for result in results:
relevance = result.get('score', 0.0)
if relevance > 0.6:
citations.append({
"source": result.get('id'),
"title": result.get('text', '')[:100] + "...",
"relevance": relevance,
"citation_text": f"Source: {result.get('id')} (Relevance: {relevance:.2f})"
})
return citations
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Citation finder failed: {e}")
return []
async def claim_verifier(self, content: str) -> Dict[str, Any]:
"""
Tool: Detects unsupported statements and hallucinations.
"""
self._log_agent_operation("Verifying claims in content", content_length=len(content))
# 1. Extract potential claims (heuristic: numbers, 'research shows', etc.)
# This is a simplified extraction. A real implementation would use NLP/LLM.
claims = []
sentences = content.split('.')
for sent in sentences:
if any(char.isdigit() for char in sent) or "show" in sent.lower() or "study" in sent.lower():
if len(sent.strip()) > 20:
claims.append(sent.strip())
if not claims:
return {"status": "no_claims_detected", "verified_claims": []}
verified_results = []
for claim in claims[:5]: # Limit to top 5 claims for performance
evidence = await self.verify_facts(claim)
status = "supported" if evidence else "unsupported"
verified_results.append({
"claim": claim,
"status": status,
"evidence_count": len(evidence),
"top_evidence": evidence[0] if evidence else None
})
return {
"status": "completed",
"verified_claims": verified_results,
"verification_score": len([c for c in verified_results if c['status'] == 'supported']) / len(verified_results)
}
async def verify_facts(self, claim: str) -> List[Dict[str, Any]]:
"""Verify a single claim against intelligence data."""
results = await self.intelligence.search(claim, limit=3)
evidence = []
for result in results:
if result.get('score', 0) > self.EVIDENCE_THRESHOLD:
evidence.append({
"text": result.get('text'),
"source": result.get('id'),
"confidence": result.get('score')
})
return evidence