729 lines
32 KiB
Python
729 lines
32 KiB
Python
"""
|
|
Phase 2B: Real-Time Semantic Dashboard
|
|
|
|
This module implements a real-time semantic monitoring dashboard for ongoing
|
|
content analysis, competitor tracking, and semantic health monitoring.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import time
|
|
from typing import Dict, List, Any, Optional, Set
|
|
from datetime import datetime, timedelta
|
|
from dataclasses import dataclass, asdict
|
|
from loguru import logger
|
|
|
|
from ..txtai_service import TxtaiIntelligenceService
|
|
from ..semantic_cache import semantic_cache_manager
|
|
from ..sif_integration import SIFIntegrationService
|
|
# Agent imports will be done lazily to avoid circular imports
|
|
|
|
|
|
@dataclass
|
|
class SemanticHealthMetric:
|
|
"""Represents a semantic health metric for monitoring."""
|
|
metric_name: str
|
|
value: float
|
|
threshold: float
|
|
status: str # "healthy", "warning", "critical"
|
|
timestamp: str
|
|
description: str
|
|
recommendations: List[str]
|
|
|
|
|
|
@dataclass
|
|
class CompetitorSemanticSnapshot:
|
|
"""Snapshot of competitor semantic positioning."""
|
|
competitor_id: str
|
|
competitor_name: str
|
|
semantic_overlap: float
|
|
unique_topics: List[str]
|
|
content_volume: int
|
|
authority_score: float
|
|
last_updated: str
|
|
trending_topics: List[str]
|
|
|
|
|
|
@dataclass
|
|
class ContentSemanticInsight:
|
|
"""Represents an actionable content insight."""
|
|
insight_id: str
|
|
insight_type: str # 'gap', 'trend', 'optimization', 'threat'
|
|
title: str
|
|
description: str
|
|
confidence_score: float # 0.0 to 1.0
|
|
impact_score: float # 0.0 to 10.0
|
|
related_topics: List[str]
|
|
suggested_actions: List[str]
|
|
created_at: str
|
|
expires_at: str
|
|
source_agent: str = "SIF Intelligence" # New field for agent attribution
|
|
|
|
|
|
class RealTimeSemanticMonitor:
|
|
"""
|
|
Real-time semantic monitoring system for content and competitor analysis.
|
|
|
|
Features:
|
|
- Continuous semantic health monitoring
|
|
- Real-time competitor tracking
|
|
- Content performance analysis
|
|
- Automated alerting system
|
|
- Trend detection and forecasting
|
|
"""
|
|
|
|
def __init__(self, user_id: str):
|
|
self.user_id = user_id
|
|
self.intelligence_service = TxtaiIntelligenceService(user_id)
|
|
self.cache_manager = semantic_cache_manager
|
|
self.sif_service = SIFIntegrationService(user_id)
|
|
|
|
# Initialize monitoring agents (lazy initialization to avoid circular imports)
|
|
self.strategy_agent = None
|
|
self.guardian_agent = None
|
|
self.link_agent = None
|
|
|
|
# Monitoring configuration
|
|
self.monitoring_interval = 300 # 5 minutes
|
|
self.health_thresholds = {
|
|
"semantic_diversity": 0.6,
|
|
"content_freshness": 0.7,
|
|
"competitor_gap": 0.5,
|
|
"authority_score": 0.4
|
|
}
|
|
|
|
# Monitoring state
|
|
self.is_monitoring = False
|
|
self.monitored_competitors: Set[str] = set()
|
|
self.alert_subscribers: List[str] = []
|
|
self.monitoring_history: List[Dict[str, Any]] = []
|
|
|
|
logger.info(f"Real-time semantic monitor initialized for user {user_id}")
|
|
|
|
async def check_semantic_health(self, user_id: Optional[str] = None) -> SemanticHealthMetric:
|
|
"""
|
|
Public wrapper for semantic health check.
|
|
Aggregates metrics into a single health status object.
|
|
"""
|
|
# Call internal method (ignoring user_id arg if passed, as we use self.user_id)
|
|
metrics = await self._check_semantic_health()
|
|
|
|
if not metrics:
|
|
# Return a canonical semantic health summary when no metrics are available.
|
|
return SemanticHealthMetric(
|
|
metric_name="semantic_health",
|
|
value=0.0,
|
|
threshold=0.0,
|
|
status="warning",
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
description="No semantic health metrics available yet",
|
|
recommendations=[
|
|
"Run semantic analysis to populate health metrics",
|
|
"Check data sources and try again shortly"
|
|
]
|
|
)
|
|
|
|
# Aggregate metrics
|
|
# 1. Status: "critical" if any critical, else "warning" if any warning, else "healthy"
|
|
status = "healthy"
|
|
for m in metrics:
|
|
if m.status == "critical":
|
|
status = "critical"
|
|
break
|
|
if m.status == "warning":
|
|
status = "warning"
|
|
|
|
# 2. Value: Average of metric values
|
|
avg_value = sum(m.value for m in metrics) / len(metrics)
|
|
|
|
# 3. Threshold: Average threshold across health metrics
|
|
avg_threshold = sum(m.threshold for m in metrics) / len(metrics)
|
|
|
|
# 4. Recommendations: de-duplicated recommendations from non-healthy metrics
|
|
recommendations = []
|
|
seen_recommendations = set()
|
|
for metric in metrics:
|
|
if metric.status != "healthy":
|
|
for recommendation in metric.recommendations:
|
|
if recommendation not in seen_recommendations:
|
|
seen_recommendations.add(recommendation)
|
|
recommendations.append(recommendation)
|
|
|
|
if not recommendations:
|
|
recommendations = ["Continue monitoring semantic performance"]
|
|
|
|
return SemanticHealthMetric(
|
|
metric_name="semantic_health",
|
|
value=avg_value,
|
|
threshold=avg_threshold,
|
|
status=status,
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
description="Aggregated semantic health across monitoring metrics",
|
|
recommendations=recommendations,
|
|
)
|
|
|
|
async def start_monitoring(self, competitors: List[str] = None) -> bool:
|
|
"""Start real-time semantic monitoring."""
|
|
try:
|
|
self.is_monitoring = True
|
|
if competitors:
|
|
self.monitored_competitors = set(competitors)
|
|
|
|
logger.info(f"Started semantic monitoring for user {self.user_id}")
|
|
logger.info(f"Monitoring {len(self.monitored_competitors)} competitors")
|
|
|
|
# Start background monitoring task
|
|
asyncio.create_task(self._monitoring_loop())
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start semantic monitoring: {e}")
|
|
return False
|
|
|
|
async def stop_monitoring(self) -> bool:
|
|
"""Stop real-time semantic monitoring."""
|
|
try:
|
|
self.is_monitoring = False
|
|
logger.info(f"Stopped semantic monitoring for user {self.user_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to stop semantic monitoring: {e}")
|
|
return False
|
|
|
|
async def _monitoring_loop(self):
|
|
"""Main monitoring loop that runs continuously."""
|
|
while self.is_monitoring:
|
|
try:
|
|
logger.info(f"Running semantic health check for user {self.user_id}")
|
|
|
|
# Perform comprehensive semantic analysis
|
|
health_metrics = await self._check_semantic_health()
|
|
competitor_updates = await self._monitor_competitors()
|
|
content_insights = await self._analyze_content_performance()
|
|
|
|
# Store monitoring snapshot
|
|
snapshot = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"user_id": self.user_id,
|
|
"health_metrics": [asdict(metric) for metric in health_metrics],
|
|
"competitor_updates": [asdict(update) for update in competitor_updates],
|
|
"content_insights": [asdict(insight) for insight in content_insights]
|
|
}
|
|
|
|
self.monitoring_history.append(snapshot)
|
|
|
|
# Keep only last 24 hours of history
|
|
cutoff_time = datetime.now() - timedelta(hours=24)
|
|
self.monitoring_history = [
|
|
h for h in self.monitoring_history
|
|
if datetime.fromisoformat(h["timestamp"]) > cutoff_time
|
|
]
|
|
|
|
# Check for alerts
|
|
await self._check_alerts(health_metrics, competitor_updates, content_insights)
|
|
|
|
# Cache results for dashboard
|
|
await self._cache_monitoring_results(snapshot)
|
|
|
|
logger.info(f"Semantic monitoring cycle completed. Next check in {self.monitoring_interval}s")
|
|
|
|
# Wait for next cycle
|
|
await asyncio.sleep(self.monitoring_interval)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in semantic monitoring loop: {e}")
|
|
await asyncio.sleep(self.monitoring_interval) # Continue even on error
|
|
|
|
async def _check_semantic_health(self) -> List[SemanticHealthMetric]:
|
|
"""Check overall semantic health of user's content."""
|
|
metrics = []
|
|
|
|
try:
|
|
# Get current semantic insights
|
|
insights = await self.sif_service.get_semantic_insights({"user_id": self.user_id})
|
|
|
|
if insights.get("source") == "error":
|
|
logger.warning("Failed to get semantic insights for health check")
|
|
return metrics
|
|
|
|
insights_data = insights.get("insights", {})
|
|
|
|
# Semantic diversity metric
|
|
content_pillars = insights_data.get("content_pillars", [])
|
|
semantic_diversity = len(content_pillars) / 10.0 # Normalize to 0-1
|
|
|
|
diversity_status = "healthy" if semantic_diversity >= self.health_thresholds["semantic_diversity"] else "warning"
|
|
metrics.append(SemanticHealthMetric(
|
|
metric_name="semantic_diversity",
|
|
value=semantic_diversity,
|
|
threshold=self.health_thresholds["semantic_diversity"],
|
|
status=diversity_status,
|
|
timestamp=datetime.now().isoformat(),
|
|
description=f"Content covers {len(content_pillars)} semantic pillars",
|
|
recommendations=["Expand content topics", "Explore new semantic areas"] if diversity_status == "warning" else []
|
|
))
|
|
|
|
# Content freshness metric (based on recent updates)
|
|
freshness_score = await self._calculate_content_freshness()
|
|
freshness_status = "healthy" if freshness_score >= self.health_thresholds["content_freshness"] else "warning"
|
|
|
|
metrics.append(SemanticHealthMetric(
|
|
metric_name="content_freshness",
|
|
value=freshness_score,
|
|
threshold=self.health_thresholds["content_freshness"],
|
|
status=freshness_status,
|
|
timestamp=datetime.now().isoformat(),
|
|
description="Content freshness based on recent semantic updates",
|
|
recommendations=["Update content regularly", "Monitor trending topics"] if freshness_status == "warning" else []
|
|
))
|
|
|
|
# Authority score metric
|
|
authority_score = await self._calculate_authority_score()
|
|
authority_status = "healthy" if authority_score >= self.health_thresholds["authority_score"] else "critical"
|
|
|
|
metrics.append(SemanticHealthMetric(
|
|
metric_name="authority_score",
|
|
value=authority_score,
|
|
threshold=self.health_thresholds["authority_score"],
|
|
status=authority_status,
|
|
timestamp=datetime.now().isoformat(),
|
|
description="Semantic authority based on content depth and relevance",
|
|
recommendations=["Create authoritative content", "Build topical expertise"] if authority_status != "healthy" else []
|
|
))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to check semantic health: {e}")
|
|
|
|
return metrics
|
|
|
|
async def _monitor_competitors(self) -> List[CompetitorSemanticSnapshot]:
|
|
"""Monitor competitor semantic positioning."""
|
|
snapshots = []
|
|
try:
|
|
# 1. Get competitors from SIF integration
|
|
# We assume SIFIntegrationService has methods to get competitor data or we query index
|
|
# Let's try to search for "competitor_analysis" type in txtai index
|
|
results = await self.intelligence_service.search("competitor analysis", limit=10)
|
|
|
|
competitors_found = []
|
|
if results:
|
|
for res in results:
|
|
try:
|
|
metadata_str = res.get('object')
|
|
metadata = json.loads(metadata_str) if isinstance(metadata_str, str) else (metadata_str or res)
|
|
if metadata.get('type') == 'competitor_analysis':
|
|
competitors_found.append(metadata)
|
|
except: continue
|
|
|
|
# If no semantic data found, try fallback to DB/Integration service logic if needed
|
|
# For now, if we found semantic docs:
|
|
for comp_meta in competitors_found:
|
|
try:
|
|
full_report = comp_meta.get('full_report', {})
|
|
domain = comp_meta.get('url', 'Unknown')
|
|
|
|
# Calculate real metrics from the full report
|
|
# Use semantic overlap from SIF if available, or estimate
|
|
overlap = full_report.get('semantic_overlap', 0.5)
|
|
|
|
# Extract topics from the analysis content
|
|
topics = full_report.get('content_topics', [])
|
|
if not topics and 'analysis' in full_report:
|
|
# Try to extract from unstructured text if structured topics missing
|
|
topics = ["General Strategy"] # Fallback
|
|
|
|
snapshot = CompetitorSemanticSnapshot(
|
|
competitor_id=f"comp_{domain}",
|
|
competitor_name=domain,
|
|
semantic_overlap=overlap,
|
|
unique_topics=topics[:5],
|
|
content_volume=full_report.get('page_count', 0),
|
|
authority_score=full_report.get('authority_score', 0.5),
|
|
last_updated=comp_meta.get('timestamp', datetime.now().isoformat()),
|
|
trending_topics=full_report.get('trending_topics', [])
|
|
)
|
|
snapshots.append(snapshot)
|
|
except Exception as e:
|
|
logger.error(f"Error processing competitor snapshot: {e}")
|
|
|
|
if not snapshots and self.monitored_competitors:
|
|
# Fallback for manually added competitors that might not be fully indexed yet
|
|
for competitor in self.monitored_competitors:
|
|
snapshots.append(CompetitorSemanticSnapshot(
|
|
competitor_id=f"comp_{competitor}",
|
|
competitor_name=competitor,
|
|
semantic_overlap=0.0,
|
|
unique_topics=["Pending Analysis"],
|
|
content_volume=0,
|
|
authority_score=0.0,
|
|
last_updated=datetime.now().isoformat(),
|
|
trending_topics=[]
|
|
))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to monitor competitors: {e}")
|
|
|
|
return snapshots
|
|
|
|
async def _analyze_content_performance(self) -> List[ContentSemanticInsight]:
|
|
"""Analyze content performance and identify insights using SIF Agents."""
|
|
insights = []
|
|
|
|
try:
|
|
current_time = datetime.now()
|
|
|
|
# 1. Initialize Agents if needed (lazy load to avoid circular imports)
|
|
if not self.strategy_agent:
|
|
from ..agents.specialized_agents import StrategyArchitectAgent, ContentStrategyAgent, CompetitorResponseAgent
|
|
self.strategy_agent = StrategyArchitectAgent(self.user_id)
|
|
self.content_agent = ContentStrategyAgent(self.user_id)
|
|
self.competitor_agent = CompetitorResponseAgent(self.user_id)
|
|
|
|
# 2. Get Real Insights from Agents
|
|
# Content Gaps
|
|
try:
|
|
# We can reuse the propose_daily_tasks logic or call specific methods
|
|
# Let's manually construct a "gap analysis" context for the agent
|
|
gap_context = {"analysis_type": "gaps", "website_url": "user_site"}
|
|
# Ideally we call a specific method like find_semantic_gaps if available publicly
|
|
# But propose_daily_tasks returns TaskProposal objects.
|
|
# Let's check if we can get raw insights.
|
|
# The agents have methods like find_semantic_gaps (StrategyArchitect)
|
|
|
|
# Using StrategyArchitect for pillar/gap analysis
|
|
if hasattr(self.strategy_agent, 'find_semantic_gaps'):
|
|
logger.warning(
|
|
"Skipping direct semantic gap method invocation for user_id={} due to missing competitor index context",
|
|
self.user_id,
|
|
)
|
|
else:
|
|
logger.warning(
|
|
"Strategy agent missing find_semantic_gaps for user_id={}, using dashboard-context fallback",
|
|
self.user_id,
|
|
)
|
|
|
|
# Alternative: Query SIF directly for "content gaps" if they are indexed as such
|
|
# Or generate them now via LLM + SIF Context
|
|
|
|
# Let's generate ONE high quality insight via ContentStrategyAgent
|
|
# We'll simulate a task proposal request but specifically for "insights"
|
|
# Actually, let's look at SIFIntegrationService.get_content_strategy_context
|
|
|
|
# For now, to fix the "mock data" issue quickly:
|
|
# We will check if we have ANY data in SIF.
|
|
# If yes, we generate dynamic insights based on that data.
|
|
|
|
dashboard_context = await self.sif_service.get_seo_dashboard_context()
|
|
if "error" not in dashboard_context:
|
|
data = dashboard_context.get("dashboard_data", {})
|
|
summary = data.get("summary", {})
|
|
|
|
# Insight 1: Performance Trend
|
|
ctr = summary.get("ctr", 0)
|
|
if ctr < 0.02:
|
|
insights.append(ContentSemanticInsight(
|
|
insight_id="perf_low_ctr",
|
|
insight_type="opportunity",
|
|
title="Low CTR Opportunity",
|
|
description=f"Your average CTR is {ctr:.1%}. Optimizing meta descriptions could boost traffic.",
|
|
confidence_score=0.9,
|
|
impact_score=8.0,
|
|
related_topics=["meta tags", "titles", "ctr optimization"],
|
|
suggested_actions=["Rewrite titles for high-impression low-click pages"],
|
|
created_at=current_time.isoformat(),
|
|
expires_at=(current_time + timedelta(days=7)).isoformat(),
|
|
source_agent="SEO Specialist Agent"
|
|
))
|
|
|
|
# Insight 2: Keyword Opportunities (from AI insights in dashboard data)
|
|
ai_insights = data.get("ai_insights", [])
|
|
for i, ai_ins in enumerate(ai_insights[:2]): # Take top 2
|
|
insights.append(ContentSemanticInsight(
|
|
insight_id=f"ai_insight_{i}",
|
|
insight_type="trend", # Map category
|
|
title=f"AI Recommendation: {ai_ins.get('category', 'General')}",
|
|
description=ai_ins.get('insight', 'No description'),
|
|
confidence_score=0.85,
|
|
impact_score=7.5,
|
|
related_topics=[ai_ins.get('category', 'seo')],
|
|
suggested_actions=[ai_ins.get('insight')], # Simplification
|
|
created_at=current_time.isoformat(),
|
|
expires_at=(current_time + timedelta(days=7)).isoformat(),
|
|
source_agent="Strategy Architect Agent"
|
|
))
|
|
|
|
if not ai_insights:
|
|
logger.warning(
|
|
"Dashboard context returned no ai_insights for user_id={}, insight generation is degraded",
|
|
self.user_id,
|
|
)
|
|
else:
|
|
logger.warning(
|
|
"SEO dashboard context unavailable for user_id={}, using fallback insight only",
|
|
self.user_id,
|
|
)
|
|
|
|
except Exception as agent_err:
|
|
logger.warning(f"Agent insight generation failed: {agent_err}")
|
|
|
|
# If still no insights (e.g. no dashboard data), AND we have no fallback,
|
|
# THEN we might return an empty list or a "Setup" insight.
|
|
if not insights:
|
|
insights.append(ContentSemanticInsight(
|
|
insight_id="setup_001",
|
|
insight_type="gap",
|
|
title="Awaiting Data Analysis",
|
|
description="Connect Search Console or complete competitor analysis to see real-time insights.",
|
|
confidence_score=1.0,
|
|
impact_score=5.0,
|
|
related_topics=["onboarding"],
|
|
suggested_actions=["Complete Step 5 Onboarding"],
|
|
created_at=current_time.isoformat(),
|
|
expires_at=(current_time + timedelta(days=1)).isoformat(),
|
|
source_agent="Onboarding Assistant"
|
|
))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to analyze content performance: {e}")
|
|
|
|
return insights
|
|
|
|
async def _calculate_content_freshness(self) -> float:
|
|
"""Calculate content freshness score."""
|
|
# This would analyze actual content timestamps and updates
|
|
return 0.85 # Placeholder
|
|
|
|
async def _calculate_authority_score(self) -> float:
|
|
"""Calculate semantic authority score."""
|
|
# This would analyze content depth, backlinks, engagement, etc.
|
|
return 0.72 # Placeholder
|
|
|
|
async def _check_alerts(self, health_metrics: List[SemanticHealthMetric],
|
|
competitor_updates: List[CompetitorSemanticSnapshot],
|
|
content_insights: List[ContentSemanticInsight]):
|
|
"""Check for alert conditions and notify subscribers."""
|
|
alerts = []
|
|
|
|
# Check health metrics for critical conditions
|
|
for metric in health_metrics:
|
|
if metric.status == "critical":
|
|
alerts.append({
|
|
"type": "health_critical",
|
|
"title": f"Critical: {metric.metric_name}",
|
|
"message": metric.description,
|
|
"severity": "critical",
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
# Check for high-impact insights
|
|
for insight in content_insights:
|
|
if insight.impact_score >= 8.0:
|
|
alerts.append({
|
|
"type": "high_impact_insight",
|
|
"title": f"High Impact: {insight.title}",
|
|
"message": insight.description,
|
|
"severity": "warning",
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
# Send alerts to subscribers
|
|
if alerts:
|
|
try:
|
|
from services.agent_activity_service import AgentActivityService
|
|
from services.database import get_session_for_user
|
|
|
|
db = get_session_for_user(self.user_id)
|
|
if db:
|
|
service = AgentActivityService(db, self.user_id)
|
|
for alert in alerts:
|
|
alert_type = alert.get("type") or "semantic_alert"
|
|
severity = alert.get("severity") or "info"
|
|
mapped_severity = "error" if severity == "critical" else ("warning" if severity == "warning" else "info")
|
|
dedupe_key = None
|
|
if alert_type == "health_critical":
|
|
dedupe_key = f"semantic_health_critical:{alert.get('title')}:{datetime.utcnow().date().isoformat()}"
|
|
elif alert_type == "high_impact_insight":
|
|
dedupe_key = f"semantic_high_impact:{alert.get('title')}:{datetime.utcnow().date().isoformat()}"
|
|
|
|
service.create_alert(
|
|
alert_type=alert_type,
|
|
title=alert.get("title") or "Semantic alert",
|
|
message=alert.get("message") or "",
|
|
severity=mapped_severity,
|
|
payload=alert,
|
|
cta_path="/seo-dashboard",
|
|
dedupe_key=dedupe_key,
|
|
)
|
|
db.close()
|
|
except Exception as alert_err:
|
|
logger.warning(
|
|
"Unable to persist semantic alerts for user_id={} error_class={} error_message={}",
|
|
self.user_id,
|
|
type(alert_err).__name__,
|
|
str(alert_err),
|
|
)
|
|
await self._send_alerts(alerts)
|
|
|
|
async def get_cache_stats(self) -> Dict[str, Any]:
|
|
"""Get semantic cache statistics."""
|
|
return self.cache_manager.get_stats()
|
|
|
|
async def _send_alerts(self, alerts: List[Dict[str, Any]]):
|
|
"""Send alerts to subscribed users."""
|
|
for alert in alerts:
|
|
logger.warning(f"ALERT: {alert['title']} - {alert['message']}")
|
|
# Here you would integrate with notification systems (email, Slack, etc.)
|
|
|
|
async def _cache_monitoring_results(self, snapshot: Dict[str, Any]):
|
|
"""Cache monitoring results for dashboard access."""
|
|
try:
|
|
cache_key = f"semantic_monitoring_{self.user_id}"
|
|
self.cache_manager.set(
|
|
cache_key,
|
|
self.user_id,
|
|
snapshot,
|
|
ttl=300 # 5 minutes
|
|
)
|
|
|
|
logger.debug(f"Cached monitoring results for user {self.user_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to cache monitoring results: {e}")
|
|
|
|
def get_dashboard_data(self) -> Dict[str, Any]:
|
|
"""Get current dashboard data for the user."""
|
|
try:
|
|
# Get cached monitoring results
|
|
cache_key = f"semantic_monitoring_{self.user_id}"
|
|
cached_data = self.cache_manager.get(cache_key, self.user_id)
|
|
|
|
if cached_data:
|
|
return {
|
|
"status": "active" if self.is_monitoring else "inactive",
|
|
"last_updated": cached_data.get("timestamp"),
|
|
"health_metrics": cached_data.get("health_metrics", []),
|
|
"competitor_updates": cached_data.get("competitor_updates", []),
|
|
"content_insights": cached_data.get("content_insights", []),
|
|
"monitored_competitors": list(self.monitored_competitors),
|
|
"monitoring_interval": self.monitoring_interval
|
|
}
|
|
|
|
# Return default data if no cache
|
|
return {
|
|
"status": "inactive",
|
|
"last_updated": datetime.now().isoformat(),
|
|
"health_metrics": [],
|
|
"competitor_updates": [],
|
|
"content_insights": [],
|
|
"monitored_competitors": list(self.monitored_competitors),
|
|
"monitoring_interval": self.monitoring_interval
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get dashboard data: {e}")
|
|
return {"error": str(e)}
|
|
|
|
def get_monitoring_history(self, hours: int = 24) -> List[Dict[str, Any]]:
|
|
"""Get monitoring history for the specified number of hours."""
|
|
cutoff_time = datetime.now() - timedelta(hours=hours)
|
|
return [
|
|
h for h in self.monitoring_history
|
|
if datetime.fromisoformat(h["timestamp"]) > cutoff_time
|
|
]
|
|
|
|
|
|
class SemanticDashboardAPI:
|
|
"""API interface for the semantic monitoring dashboard."""
|
|
|
|
def __init__(self):
|
|
self.monitors: Dict[str, RealTimeSemanticMonitor] = {}
|
|
|
|
def get_monitor(self, user_id: str) -> RealTimeSemanticMonitor:
|
|
"""Get or create a semantic monitor for a user."""
|
|
if user_id not in self.monitors:
|
|
self.monitors[user_id] = RealTimeSemanticMonitor(user_id)
|
|
return self.monitors[user_id]
|
|
|
|
async def start_dashboard_monitoring(self, user_id: str, competitors: List[str] = None) -> Dict[str, Any]:
|
|
"""Start semantic monitoring for a user."""
|
|
monitor = self.get_monitor(user_id)
|
|
success = await monitor.start_monitoring(competitors)
|
|
|
|
return {
|
|
"user_id": user_id,
|
|
"monitoring_started": success,
|
|
"competitors": competitors or [],
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
async def stop_dashboard_monitoring(self, user_id: str) -> Dict[str, Any]:
|
|
"""Stop semantic monitoring for a user."""
|
|
monitor = self.get_monitor(user_id)
|
|
success = await monitor.stop_monitoring()
|
|
|
|
return {
|
|
"user_id": user_id,
|
|
"monitoring_stopped": success,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
def get_dashboard_data(self, user_id: str) -> Dict[str, Any]:
|
|
"""Get current dashboard data for a user."""
|
|
monitor = self.get_monitor(user_id)
|
|
return monitor.get_dashboard_data()
|
|
|
|
def get_monitoring_history(self, user_id: str, hours: int = 24) -> List[Dict[str, Any]]:
|
|
"""Get monitoring history for a user."""
|
|
monitor = self.get_monitor(user_id)
|
|
return monitor.get_monitoring_history(hours)
|
|
|
|
|
|
# Global API instance
|
|
semantic_dashboard_api = SemanticDashboardAPI()
|
|
|
|
|
|
# Example usage and testing
|
|
async def test_semantic_dashboard():
|
|
"""Test the real-time semantic dashboard."""
|
|
logger.info("Testing Real-Time Semantic Dashboard")
|
|
|
|
# Create test monitor
|
|
user_id = "test_user_dashboard"
|
|
competitors = ["competitor1.com", "competitor2.com", "competitor3.com"]
|
|
|
|
# Start monitoring
|
|
logger.info("Starting semantic monitoring...")
|
|
start_result = await semantic_dashboard_api.start_dashboard_monitoring(user_id, competitors)
|
|
logger.info(f"Monitoring started: {start_result}")
|
|
|
|
# Wait a bit for monitoring to collect data
|
|
logger.info("Waiting for monitoring data collection...")
|
|
await asyncio.sleep(10)
|
|
|
|
# Get dashboard data
|
|
logger.info("Getting dashboard data...")
|
|
dashboard_data = semantic_dashboard_api.get_dashboard_data(user_id)
|
|
logger.info(f"Dashboard status: {dashboard_data.get('status')}")
|
|
logger.info(f"Health metrics: {len(dashboard_data.get('health_metrics', []))}")
|
|
logger.info(f"Competitor updates: {len(dashboard_data.get('competitor_updates', []))}")
|
|
logger.info(f"Content insights: {len(dashboard_data.get('content_insights', []))}")
|
|
|
|
# Get monitoring history
|
|
logger.info("Getting monitoring history...")
|
|
history = semantic_dashboard_api.get_monitoring_history(user_id, hours=1)
|
|
logger.info(f"Monitoring history entries: {len(history)}")
|
|
|
|
# Stop monitoring
|
|
logger.info("Stopping semantic monitoring...")
|
|
stop_result = await semantic_dashboard_api.stop_dashboard_monitoring(user_id)
|
|
logger.info(f"Monitoring stopped: {stop_result}")
|
|
|
|
logger.info("Semantic Dashboard test completed successfully!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run test
|
|
asyncio.run(test_semantic_dashboard())
|