Files
ajaysi 923fa671fe feat: ContentGuardianAgent, onboarding UX, Team Activity action wiring, docs, agent help modal
ContentGuardianAgent consolidation:
- Merge 3 duplicate classes into single source in specialized/content_guardian.py
- Watchdog audit_committee() with heuristic scoring, coverage gaps, overlaps, alerts
- Remove misleading rejection_rate() helper; use acceptance_rate directly
- Integrate audit + alerts + trend signals into today_workflow_service.py

Team Activity page:
- QualityAuditPanel: health ring, per-agent critiques, coverage gaps, overlaps
- TrendSignalsPanel: opportunity cards with urgency/impact/coverage bars
- AlertBanner: persistent dismiss via POST /alerts/{id}/mark-read
- AgentHelpModal: dialog showing all 8 agents with descriptions, tools, schedule
- QualityAuditPanel action buttons: Fill gap -> /content-planning, Resolve overlap, View CTA on alerts/issues
- TrendSignalsPanel action buttons: Create content from this trend -> /blog-writer with trend context state

Onboarding system:
- Step 4 validation: no auto-pass via basic_ready; requires persona data or explicit progression
- Step 5 validation: logs warning on auto-pass without integration data
- OnboardingCompletionService: single DB session, transactional task creation, upsert pattern
- Business-without-website: nullable website_url on SIFIndexingTask and MarketTrendsTask
- DeepCompetitorAnalysisExecutor: 5-min timeout, 10-competitor cap, asyncio.wait_for
- Persona generation: async with 30s timeout, falls back to scheduler
- OnboardingProgressService.reset_onboarding(): resets session + pauses all DB tasks
- OnboardingControlService.reset_onboarding(): also cancels APScheduler jobs
- FinalStep TaskSchedulingPanel: shows scheduled/failed tasks after completion, 8s auto-redirect
- onboarding_completed agent activity event logged to feed

Documentation:
- docs-site/features/onboarding/: overview, steps, scheduler-tasks, technical-reference (4 pages)
- docs-site/mkdocs.yml: added Onboarding System nav section
- docs-site/features/sif-agents/: overview, agent-directory, committee-system, content-guardian (4 pages)
- docs-site/features/team-activity/: overview, quality-audit, trend-signals, alert-system (4 pages)
- docs-site/features/todays-workflow/: updated overview, technical-architecture, workflow-guide, api-reference
2026-06-01 12:24:31 +05:30

423 lines
19 KiB
Python

"""
SIF Agent Interfaces
Defines the specialized agents for digital marketing and SEO.
Each agent leverages TxtaiIntelligenceService for semantic operations.
"""
import traceback
from typing import List, Dict, Any, Optional
from datetime import datetime
from loguru import logger
from .txtai_service import TxtaiIntelligenceService
class SIFBaseAgent:
def __init__(self, intelligence_service: TxtaiIntelligenceService):
self.intelligence = intelligence_service
def _log_agent_operation(self, operation: str, **kwargs):
"""Standardized logging for agent operations."""
logger.info(f"[{self.__class__.__name__}] {operation}")
if kwargs:
logger.debug(f"[{self.__class__.__name__}] Parameters: {kwargs}")
class StrategyArchitectAgent(SIFBaseAgent):
"""Agent for discovering content pillars and identifying strategic gaps."""
async def discover_pillars(self) -> List[Dict[str, Any]]:
"""Identify content pillars through semantic clustering."""
self._log_agent_operation("Discovering content pillars")
try:
# Check if intelligence service is initialized
if not self.intelligence.is_initialized():
logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized")
return []
clusters = await self.intelligence.cluster(min_score=0.6)
if not clusters:
logger.warning(f"[{self.__class__.__name__}] No clusters found")
return []
# Create pillar objects with metadata
pillars = []
for i, cluster_indices in enumerate(clusters):
pillar = {
"pillar_id": f"pillar_{i}",
"indices": cluster_indices,
"size": len(cluster_indices),
"confidence": self._calculate_cluster_confidence(cluster_indices)
}
pillars.append(pillar)
logger.debug(f"[{self.__class__.__name__}] Created pillar {pillar['pillar_id']} with {pillar['size']} items")
logger.info(f"[{self.__class__.__name__}] Discovered {len(pillars)} content pillars")
return pillars
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to discover pillars: {e}")
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
return []
def _calculate_cluster_confidence(self, cluster_indices: List[int]) -> float:
"""Calculate confidence score for a cluster based on its size and coherence."""
# Simple confidence based on cluster size - larger clusters are more reliable
return min(1.0, len(cluster_indices) / 10.0)
async def find_semantic_gaps(self, competitor_indices: List[int]) -> List[Dict[str, Any]]:
"""Compare user content vs competitor content to find missing topics."""
self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices))
try:
# STUB: Implement cross-index comparison
# This would involve:
# 1. Getting user content topics/themes
# 2. Getting competitor content topics/themes
# 3. Finding topics competitors cover but user doesn't
logger.info(f"[{self.__class__.__name__}] Found semantic gaps analysis stub")
return [
{"topic": "Topic A", "priority": "high", "reason": "Competitor coverage gap"},
{"topic": "Topic B", "priority": "medium", "reason": "Emerging trend"}
]
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to find semantic gaps: {e}")
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
return []
class LinkGraphAgent(SIFBaseAgent):
"""
Agent for internal link suggestions, graph management, and authority analysis.
Implements the semantic link graph using SIF and GSC/Bing data.
"""
RELEVANCE_THRESHOLD = 0.6 # Minimum relevance score for link suggestions
MAX_SUGGESTIONS = 10 # Maximum number of link suggestions
def __init__(self, intelligence_service: TxtaiIntelligenceService, sif_service: Any = None):
super().__init__(intelligence_service)
self.sif_service = sif_service
async def suggest_internal_links(self, draft: str) -> List[Dict[str, Any]]:
"""Suggest internal links based on semantic proximity and authority."""
return await self.link_suggester(draft)
async def link_suggester(self, draft: str) -> List[Dict[str, Any]]:
"""
Tool: Suggests internal links.
Analyzes draft content and finds semantically relevant pages, boosted by authority.
"""
self._log_agent_operation("Suggesting internal links", draft_length=len(draft))
try:
if not self.intelligence.is_initialized():
logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized")
return []
if not draft or len(draft.strip()) < 50: # Reduced threshold for testing
logger.warning(f"[{self.__class__.__name__}] Draft too short for meaningful link suggestions")
return []
# 1. Get Semantic Candidates
results = await self.intelligence.search(draft, limit=self.MAX_SUGGESTIONS)
if not results:
logger.info(f"[{self.__class__.__name__}] No relevant internal pages found")
return []
# 2. Get Authority Data (if available)
authority_map = {}
if self.sif_service:
try:
# Fetch dashboard context to get top performing content
# Note: This relies on what's available in the SIF index/dashboard summary
dashboard_context = await self.sif_service.get_seo_dashboard_context()
if "error" not in dashboard_context:
# Extract top queries/pages if available in summary
# Ideally, we'd have a map of URL -> Authority Score
# For now, we'll try to extract what we can
data = dashboard_context.get("dashboard_data", {})
summary = data.get("summary", {})
# Example: Boost if site health is good (general confidence)
site_health = data.get("health_score", {}).get("score", 0)
# If we had top pages in the summary, we'd use them.
# For now, we'll use a placeholder authority map or just the site health
pass
except Exception as e:
logger.warning(f"Failed to fetch authority data: {e}")
suggestions = []
for result in results:
relevance_score = result.get('score', 0.0)
url = result.get('id', 'unknown')
# Apply authority boost (placeholder logic)
# In a full implementation, we'd look up 'url' in authority_map
authority_boost = 1.0
final_score = relevance_score * authority_boost
if final_score >= self.RELEVANCE_THRESHOLD:
suggestion = {
"url": url,
"relevance": relevance_score,
"final_score": final_score,
"confidence": self._calculate_link_confidence(final_score),
"reason": f"Semantic similarity: {relevance_score:.3f}"
}
suggestions.append(suggestion)
logger.debug(f"[{self.__class__.__name__}] Added link suggestion: {url} (score: {final_score:.3f})")
# Sort by final score
suggestions.sort(key=lambda x: x['final_score'], reverse=True)
logger.info(f"[{self.__class__.__name__}] Generated {len(suggestions)} internal link suggestions")
return suggestions
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to suggest internal links: {e}")
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
return []
async def graph_builder(self) -> Dict[str, Any]:
"""
Tool: Builds/Visualizes the semantic link graph.
Returns the structure of the graph (nodes and edges) for visualization or analysis.
"""
self._log_agent_operation("Building semantic link graph")
try:
if not self.intelligence.is_initialized():
return {"error": "Intelligence service not initialized"}
# This is a resource-intensive operation in a real vector DB.
# Here we simulate the graph structure based on recent content or clusters.
# 1. Get Clusters (Nodes)
clusters = await self.intelligence.cluster(min_score=0.5)
nodes = []
edges = []
for i, cluster in enumerate(clusters):
cluster_id = f"cluster_{i}"
nodes.append({
"id": cluster_id,
"type": "topic_cluster",
"size": len(cluster)
})
# Add content items as nodes linked to cluster
for item_idx in cluster:
# We need to retrieve item metadata.
# txtai cluster returns indices. We might need to query by index or ID.
# For this implementation, we'll return a simplified view.
pass
return {
"graph_stats": {
"total_clusters": len(clusters),
"total_nodes": sum(len(c) for c in clusters)
},
"structure": "hierarchical", # vs flat
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to build graph: {e}")
return {"error": str(e)}
async def authority_analyzer(self, target_url: Optional[str] = None) -> Dict[str, Any]:
"""
Tool: Analyzes the authority of the site or specific pages using GSC/Bing data.
"""
self._log_agent_operation("Analyzing authority", target_url=target_url)
if not self.sif_service:
return {"error": "SIF Service unavailable for authority analysis"}
try:
# 1. Get Dashboard Context
context = await self.sif_service.get_seo_dashboard_context()
if "error" in context:
return context
data = context.get("dashboard_data", {})
summary = data.get("summary", {})
health = data.get("health_score", {})
# 2. Extract Authority Metrics
authority_report = {
"domain_authority_proxy": {
"health_score": health.get("score"),
"total_clicks": summary.get("clicks"),
"avg_position": summary.get("position")
},
"page_authority": "Page-level authority requires granular GSC data (Planned)", # Placeholder
"timestamp": datetime.utcnow().isoformat()
}
return authority_report
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Authority analysis failed: {e}")
return {"error": str(e)}
def _calculate_link_confidence(self, relevance_score: float) -> float:
"""Calculate confidence score for a link suggestion."""
# Simple confidence based on relevance score
return min(1.0, relevance_score * 1.5)
async def optimize_anchor_text(self, target_url: str, context: str) -> str:
"""Suggest the best anchor text for a given link based on target page context."""
self._log_agent_operation("Optimizing anchor text", target_url=target_url, context_length=len(context))
try:
# In a real implementation, we would fetch the target page content via SIF
# and use an LLM to generate the anchor text.
# Placeholder for LLM call
# if self.llm: ...
logger.info(f"[{self.__class__.__name__}] Anchor text optimization stub completed")
return "relevant anchor text" # Placeholder
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to optimize anchor text: {e}")
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
return "click here" # Fallback anchor text
class CitationExpert(SIFBaseAgent):
"""
Agent for fact-checking, citation generation, and evidence verification.
"""
EVIDENCE_THRESHOLD = 0.7 # Minimum relevance score for evidence
MAX_EVIDENCE = 5 # Maximum number of evidence pieces to return
async def fact_checker(self, claim: str) -> List[Dict[str, Any]]:
"""
Tool: Verifies facts against trusted research data.
Returns supporting or contradicting evidence.
"""
return await self.verify_facts(claim)
async def citation_finder(self, topic: str) -> List[Dict[str, Any]]:
"""
Tool: Suggests authoritative citations for a given topic.
"""
self._log_agent_operation("Finding citations", topic=topic)
try:
if not self.intelligence.is_initialized():
return []
# Search for highly relevant content
results = await self.intelligence.search(topic, limit=self.MAX_EVIDENCE)
citations = []
for result in results:
relevance = result.get('score', 0.0)
if relevance > 0.6:
citations.append({
"source": result.get('id'),
"title": result.get('text', '')[:100] + "...",
"relevance": relevance,
"citation_text": f"Source: {result.get('id')} (Relevance: {relevance:.2f})"
})
return citations
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Citation finder failed: {e}")
return []
async def claim_verifier(self, content: str) -> Dict[str, Any]:
"""
Tool: Detects unsupported statements and hallucinations.
"""
self._log_agent_operation("Verifying claims in content", content_length=len(content))
# 1. Extract potential claims (heuristic: numbers, 'research shows', etc.)
# This is a simplified extraction. A real implementation would use NLP/LLM.
claims = []
sentences = content.split('.')
for sent in sentences:
if any(char.isdigit() for char in sent) or "show" in sent.lower() or "study" in sent.lower():
if len(sent.strip()) > 20:
claims.append(sent.strip())
if not claims:
return {"status": "no_claims_detected", "verified_claims": []}
verified_results = []
for claim in claims[:5]: # Limit to top 5 claims for performance
evidence = await self.verify_facts(claim)
status = "supported" if evidence else "unsupported"
verified_results.append({
"claim": claim,
"status": status,
"evidence_count": len(evidence),
"top_evidence": evidence[0]['source'] if evidence else None
})
return {
"status": "verification_complete",
"total_claims": len(claims),
"verified_claims": verified_results,
"unsupported_count": len([c for c in verified_results if c['status'] == 'unsupported']),
"timestamp": datetime.utcnow().isoformat()
}
async def verify_facts(self, claim: str) -> List[Dict[str, Any]]:
"""Find supporting or contradicting evidence in the indexed research."""
self._log_agent_operation("Verifying facts", claim_length=len(claim))
try:
if not self.intelligence.is_initialized():
logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized")
return []
if not claim or len(claim.strip()) < 20:
logger.warning(f"[{self.__class__.__name__}] Claim too short for meaningful verification")
return []
results = await self.intelligence.search(claim, limit=self.MAX_EVIDENCE)
if not results:
logger.info(f"[{self.__class__.__name__}] No evidence found for claim")
return []
evidence = []
for result in results:
relevance_score = result.get('score', 0.0)
if relevance_score >= self.EVIDENCE_THRESHOLD:
evidence_piece = {
"source": result.get('id', 'unknown'),
"relevance": relevance_score,
"confidence": self._calculate_evidence_confidence(relevance_score),
"type": "supporting" if relevance_score > 0.8 else "related",
"excerpt": result.get('text', '')[:200] + "..." if len(result.get('text', '')) > 200 else result.get('text', '')
}
evidence.append(evidence_piece)
logger.debug(f"[{self.__class__.__name__}] Found evidence: {evidence_piece['source']} (score: {relevance_score:.3f})")
logger.info(f"[{self.__class__.__name__}] Found {len(evidence)} pieces of evidence for claim")
return evidence
except Exception as e:
logger.error(f"[{self.__class__.__name__}] Failed to verify facts: {e}")
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
return []
def _calculate_evidence_confidence(self, relevance_score: float) -> float:
"""Calculate confidence score for evidence."""
# Simple confidence based on relevance score
return min(1.0, relevance_score * 1.2)