Recovered critical missing components: PerformanceMonitor, MarketSignalDetector, and SemanticDashboard

This commit is contained in:
ajaysi
2026-02-08 14:06:09 +05:30
parent e404a86502
commit 43e66835ac
3 changed files with 1837 additions and 165 deletions

View File

@@ -172,79 +172,646 @@ class MarketSignalDetector:
return prioritized_signals
except Exception as e:
logger.error(f"Error detecting market signals: {str(e)}")
logger.error(f"Error detecting market signals for user {self.user_id}: {e}")
return []
async def _get_signal_context(self) -> SignalContext:
"""Fetch current context for signal detection"""
# Placeholder implementation
return SignalContext(
user_id=self.user_id,
competitor_data={},
semantic_health={},
seo_performance={},
content_analysis={},
historical_data={}
)
def _is_cache_valid(self, signals: List[MarketSignal]) -> bool:
"""Check if cached signals are still valid"""
if not signals:
return False
# Basic check for now
return True
"""Get comprehensive context for signal detection"""
try:
# Get semantic health
semantic_health = await self.semantic_monitor.check_semantic_health(self.user_id)
# Get competitor data
competitor_data = await self._get_competitor_data()
# Get SEO performance
seo_performance = await self._get_seo_performance()
# Get content analysis
content_analysis = await self._get_content_analysis()
# Get historical data
historical_data = await self._get_historical_data()
return SignalContext(
user_id=self.user_id,
competitor_data=competitor_data,
semantic_health=semantic_health,
seo_performance=seo_performance,
content_analysis=content_analysis,
historical_data=historical_data
)
except Exception as e:
logger.error(f"Error getting signal context for user {self.user_id}: {e}")
# Return minimal context
return SignalContext(
user_id=self.user_id,
competitor_data={},
semantic_health={},
seo_performance={},
content_analysis={},
historical_data={}
)
async def _detect_competitor_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect signals from competitor activities"""
return []
async def _detect_serp_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect signals from SERP changes"""
return []
async def _detect_social_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect signals from social trends"""
return []
async def _detect_industry_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect signals from industry news"""
return []
async def _detect_performance_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect signals from site performance"""
return []
async def _detect_content_gap_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect signals from content gaps"""
return []
async def _detect_seo_opportunity_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect signals from SEO opportunities"""
return []
def _filter_signals(self, signals: List[MarketSignal]) -> List[MarketSignal]:
"""Filter out low-quality or duplicate signals"""
"""Detect competitor-related market signals"""
signals = []
try:
competitor_data = context.competitor_data.get('competitors', [])
for competitor in competitor_data:
competitor_id = competitor.get('id')
competitor_name = competitor.get('name', 'Unknown Competitor')
# Check for significant changes in competitor metrics
current_metrics = {
'content_volume': competitor.get('content_volume', 0),
'semantic_overlap': competitor.get('semantic_overlap', 0),
'authority_score': competitor.get('authority_score', 0),
'trending_topics': len(competitor.get('trending_topics', []))
}
# Compare with baseline metrics
baseline_key = f"competitor_{competitor_id}"
baseline = self.baseline_metrics.get(baseline_key, current_metrics)
# Detect significant changes
for metric, current_value in current_metrics.items():
baseline_value = baseline.get(metric, current_value)
change_percentage = abs(current_value - baseline_value) / max(baseline_value, 1)
if change_percentage > self.thresholds['competitor_change_threshold']:
signal = MarketSignal(
signal_id=f"competitor_{competitor_id}_{metric}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
signal_type=SignalType.COMPETITOR_CHANGE,
source=competitor_name,
description=f"Competitor {competitor_name} shows significant change in {metric}: {change_percentage:.1%}",
impact_score=min(0.9, change_percentage * 2), # Cap at 0.9
urgency_level=self._determine_urgency(change_percentage),
confidence_score=0.8,
related_topics=competitor.get('trending_topics', [])[:3],
suggested_actions=self._get_competitor_response_actions(metric, change_percentage),
metadata={
'competitor_id': competitor_id,
'metric': metric,
'old_value': baseline_value,
'new_value': current_value,
'change_percentage': change_percentage
}
)
signals.append(signal)
# Update baseline
self.baseline_metrics[baseline_key] = current_metrics
except Exception as e:
logger.error(f"Error detecting competitor signals: {e}")
return signals
async def _detect_serp_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect SERP-related market signals"""
signals = []
try:
seo_performance = context.seo_performance
# Check for significant SERP position changes
serp_changes = seo_performance.get('serp_changes', [])
for change in serp_changes:
keyword = change.get('keyword')
old_position = change.get('old_position', 100)
new_position = change.get('new_position', 100)
# Calculate position change
position_change = old_position - new_position # Positive = improvement
change_percentage = abs(position_change) / max(old_position, 1)
if change_percentage > self.thresholds['serp_fluctuation_threshold']:
if position_change > 0: # Improvement
description = f"Significant SERP improvement for '{keyword}': moved from {old_position} to {new_position}"
impact_score = min(0.8, change_percentage * 1.5)
urgency_level = UrgencyLevel.LOW
suggested_actions = ["Monitor trend", "Capitalize on improvement"]
else: # Decline
description = f"Significant SERP decline for '{keyword}': dropped from {old_position} to {new_position}"
impact_score = min(0.9, change_percentage * 2)
urgency_level = UrgencyLevel.HIGH
suggested_actions = ["Investigate cause", "Optimize content", "Check technical SEO"]
signal = MarketSignal(
signal_id=f"serp_{keyword.replace(' ', '_')}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
signal_type=SignalType.SERP_FLUCTUATION,
source="SERP Analysis",
description=description,
impact_score=impact_score,
urgency_level=urgency_level,
confidence_score=0.85,
related_topics=[keyword],
suggested_actions=suggested_actions,
metadata={
'keyword': keyword,
'old_position': old_position,
'new_position': new_position,
'position_change': position_change,
'change_percentage': change_percentage
}
)
signals.append(signal)
except Exception as e:
logger.error(f"Error detecting SERP signals: {e}")
return signals
async def _detect_social_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect social media trend signals"""
signals = []
try:
# Get social media data
social_data = context.historical_data.get('social_metrics', {})
# Check for trending topics
trending_topics = social_data.get('trending_topics', [])
for topic in trending_topics:
topic_name = topic.get('topic')
engagement_rate = topic.get('engagement_rate', 0)
trend_score = topic.get('trend_score', 0)
if trend_score > self.thresholds['social_trend_threshold']:
signal = MarketSignal(
signal_id=f"social_{topic_name.replace(' ', '_')}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
signal_type=SignalType.SOCIAL_TREND,
source="Social Media Analysis",
description=f"Social trend detected: '{topic_name}' with engagement rate {engagement_rate:.2%}",
impact_score=min(0.8, trend_score * 1.5),
urgency_level=self._determine_urgency(trend_score),
confidence_score=0.75,
related_topics=[topic_name],
suggested_actions=["Create content on trending topic", "Monitor trend development", "Engage with trend"],
metadata={
'topic': topic_name,
'engagement_rate': engagement_rate,
'trend_score': trend_score,
'platforms': topic.get('platforms', [])
}
)
signals.append(signal)
except Exception as e:
logger.error(f"Error detecting social signals: {e}")
return signals
async def _detect_industry_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect industry news and trend signals"""
signals = []
try:
# Get industry data
industry_data = context.historical_data.get('industry_news', {})
# Check for significant industry developments
news_items = industry_data.get('recent_news', [])
for news in news_items:
news_title = news.get('title', 'Industry News')
relevance_score = news.get('relevance_score', 0)
impact_assessment = news.get('impact_assessment', 'medium')
if relevance_score > 0.6: # High relevance to user's industry
signal = MarketSignal(
signal_id=f"industry_{hash(news_title) % 10000}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
signal_type=SignalType.INDUSTRY_NEWS,
source="Industry News Analysis",
description=f"Industry development: {news_title}",
impact_score=min(0.9, relevance_score * 1.2),
urgency_level=self._map_impact_to_urgency(impact_assessment),
confidence_score=0.8,
related_topics=news.get('related_topics', []),
suggested_actions=["Analyze industry impact", "Adjust strategy if needed", "Monitor competitor response"],
metadata={
'news_title': news_title,
'relevance_score': relevance_score,
'impact_assessment': impact_assessment,
'news_date': news.get('date'),
'source': news.get('source')
}
)
signals.append(signal)
except Exception as e:
logger.error(f"Error detecting industry signals: {e}")
return signals
async def _detect_performance_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect performance change signals"""
signals = []
try:
# Get performance data
performance_data = context.historical_data.get('performance_metrics', {})
# Check for significant changes in key metrics
current_metrics = {
'traffic': performance_data.get('current_traffic', 0),
'engagement': performance_data.get('current_engagement', 0),
'conversion_rate': performance_data.get('current_conversion_rate', 0),
'bounce_rate': performance_data.get('current_bounce_rate', 0)
}
# Compare with historical baseline
baseline_metrics = performance_data.get('baseline_metrics', current_metrics)
for metric, current_value in current_metrics.items():
baseline_value = baseline_metrics.get(metric, current_value)
if baseline_value > 0: # Avoid division by zero
change_percentage = abs(current_value - baseline_value) / baseline_value
if change_percentage > self.thresholds['performance_change_threshold']:
if current_value > baseline_value: # Improvement
description = f"Performance improvement detected: {metric} increased by {change_percentage:.1%}"
impact_score = min(0.7, change_percentage * 1.5)
urgency_level = UrgencyLevel.LOW
suggested_actions = ["Monitor trend", "Analyze success factors", "Scale successful strategies"]
else: # Decline
description = f"Performance decline detected: {metric} decreased by {change_percentage:.1%}"
impact_score = min(0.9, change_percentage * 2)
urgency_level = UrgencyLevel.HIGH
suggested_actions = ["Investigate cause", "Implement corrective measures", "Monitor recovery"]
signal = MarketSignal(
signal_id=f"performance_{metric}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
signal_type=SignalType.PERFORMANCE_CHANGE,
source="Performance Analytics",
description=description,
impact_score=impact_score,
urgency_level=urgency_level,
confidence_score=0.9,
related_topics=[metric],
suggested_actions=suggested_actions,
metadata={
'metric': metric,
'old_value': baseline_value,
'new_value': current_value,
'change_percentage': change_percentage,
'trend_direction': 'up' if current_value > baseline_value else 'down'
}
)
signals.append(signal)
except Exception as e:
logger.error(f"Error detecting performance signals: {e}")
return signals
async def _detect_content_gap_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect content gap signals"""
signals = []
try:
semantic_health = context.semantic_health
# Check for significant semantic gaps
semantic_gaps = semantic_health.get('semantic_gaps', [])
for gap in semantic_gaps:
gap_topic = gap.get('topic')
gap_score = gap.get('gap_score', 0)
competitor_coverage = gap.get('competitor_coverage', 0)
if gap_score > self.thresholds['content_gap_threshold']:
signal = MarketSignal(
signal_id=f"content_gap_{gap_topic.replace(' ', '_')}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
signal_type=SignalType.CONTENT_GAP,
source="Semantic Analysis",
description=f"Content gap identified: '{gap_topic}' with gap score {gap_score:.2f}",
impact_score=min(0.8, gap_score * 1.5),
urgency_level=self._determine_urgency(gap_score),
confidence_score=0.85,
related_topics=[gap_topic],
suggested_actions=["Create content on gap topic", "Analyze competitor approach", "Optimize existing content"],
metadata={
'gap_topic': gap_topic,
'gap_score': gap_score,
'competitor_coverage': competitor_coverage,
'semantic_similarity': gap.get('semantic_similarity', 0)
}
)
signals.append(signal)
except Exception as e:
logger.error(f"Error detecting content gap signals: {e}")
return signals
async def _detect_seo_opportunity_signals(self, context: SignalContext) -> List[MarketSignal]:
"""Detect SEO opportunity signals"""
signals = []
try:
seo_performance = context.seo_performance
# Check for SEO opportunities
seo_opportunities = seo_performance.get('opportunities', [])
for opportunity in seo_opportunities:
opportunity_type = opportunity.get('type')
opportunity_score = opportunity.get('opportunity_score', 0)
estimated_impact = opportunity.get('estimated_impact', 'medium')
if opportunity_score > self.thresholds['seo_opportunity_threshold']:
signal = MarketSignal(
signal_id=f"seo_opportunity_{opportunity_type}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
signal_type=SignalType.SEO_OPPORTUNITY,
source="SEO Analysis",
description=f"SEO opportunity identified: {opportunity_type} with score {opportunity_score:.2f}",
impact_score=min(0.8, opportunity_score * 1.5),
urgency_level=self._map_impact_to_urgency(estimated_impact),
confidence_score=0.8,
related_topics=opportunity.get('related_keywords', []),
suggested_actions=["Implement SEO recommendation", "Monitor impact", "Scale successful optimizations"],
metadata={
'opportunity_type': opportunity_type,
'opportunity_score': opportunity_score,
'estimated_impact': estimated_impact,
'implementation_effort': opportunity.get('implementation_effort', 'medium'),
'priority_score': opportunity.get('priority_score', 0)
}
)
signals.append(signal)
except Exception as e:
logger.error(f"Error detecting SEO opportunity signals: {e}")
return signals
# Helper methods
def _determine_urgency(self, score: float) -> UrgencyLevel:
"""Determine urgency level based on score"""
if score >= 0.8:
return UrgencyLevel.CRITICAL
elif score >= 0.6:
return UrgencyLevel.HIGH
elif score >= 0.3:
return UrgencyLevel.MEDIUM
else:
return UrgencyLevel.LOW
def _map_impact_to_urgency(self, impact: str) -> UrgencyLevel:
"""Map impact assessment to urgency level"""
impact_map = {
'critical': UrgencyLevel.CRITICAL,
'high': UrgencyLevel.HIGH,
'medium': UrgencyLevel.MEDIUM,
'low': UrgencyLevel.LOW
}
return impact_map.get(impact.lower(), UrgencyLevel.MEDIUM)
def _get_competitor_response_actions(self, metric: str, change_percentage: float) -> List[str]:
"""Get suggested actions for competitor changes"""
actions = []
if metric == 'content_volume':
if change_percentage > 0:
actions = ["Analyze competitor content strategy", "Identify content gaps", "Increase content production"]
else:
actions = ["Monitor competitor focus shift", "Identify new opportunities", "Maintain content quality"]
elif metric == 'semantic_overlap':
if change_percentage > 0:
actions = ["Differentiate content strategy", "Find unique angles", "Avoid keyword cannibalization"]
else:
actions = ["Explore new topics", "Expand content coverage", "Monitor competitor positioning"]
elif metric == 'authority_score':
if change_percentage > 0:
actions = ["Analyze competitor backlink strategy", "Improve content quality", "Build domain authority"]
else:
actions = ["Capitalize on competitor weakness", "Strengthen own authority", "Monitor recovery"]
else:
actions = ["Monitor competitor activity", "Analyze impact on market", "Adjust strategy if needed"]
return actions
def _filter_signals(self, signals: List[MarketSignal]) -> List[MarketSignal]:
"""Filter signals based on relevance and quality"""
filtered = []
for signal in signals:
# Skip low confidence signals
if signal.confidence_score < 0.5:
continue
# Skip expired signals
if self._is_signal_expired(signal):
continue
# Skip duplicate signals (same type and source within short timeframe)
if self._is_duplicate_signal(signal, filtered):
continue
filtered.append(signal)
return filtered
def _prioritize_signals(self, signals: List[MarketSignal]) -> List[MarketSignal]:
"""Prioritize signals based on impact and urgency"""
return sorted(signals, key=lambda x: (x.urgency_level.value, x.impact_score), reverse=True)
def _trim_signal_history(self):
"""Keep signal history within limits"""
if len(self.signal_history) > 1000:
self.signal_history = self.signal_history[-1000:]
class MarketTrendAnalyzer:
"""
Analyzer for detecting market trends from aggregated signals.
"""
def __init__(self, user_id: str):
self.user_id = user_id
self.detector = MarketSignalDetector(user_id)
# Sort by priority score (impact * urgency_weight)
def priority_score(signal: MarketSignal) -> float:
urgency_weights = {
UrgencyLevel.CRITICAL: 1.0,
UrgencyLevel.HIGH: 0.8,
UrgencyLevel.MEDIUM: 0.5,
UrgencyLevel.LOW: 0.2
}
urgency_weight = urgency_weights.get(signal.urgency_level, 0.5)
return signal.impact_score * urgency_weight * signal.confidence_score
async def analyze_trends(self, context: Optional[Dict[str, Any]] = None) -> List[MarketSignal]:
"""Analyze current market trends"""
# Placeholder implementation
logger.info(f"Analyzing market trends for user {self.user_id}")
return []
return sorted(signals, key=priority_score, reverse=True)
def _is_signal_expired(self, signal: MarketSignal) -> bool:
"""Check if signal has expired"""
try:
expires_at = datetime.fromisoformat(signal.expires_at)
return datetime.utcnow() > expires_at
except:
return False
def _is_duplicate_signal(self, signal: MarketSignal, existing_signals: List[MarketSignal]) -> bool:
"""Check if signal is a duplicate of recent signals"""
try:
signal_time = datetime.fromisoformat(signal.detected_at)
for existing in existing_signals:
if (existing.signal_type == signal.signal_type and
existing.source == signal.source and
existing.related_topics == signal.related_topics):
# Check if within 1 hour
existing_time = datetime.fromisoformat(existing.detected_at)
if abs((signal_time - existing_time).total_seconds()) < 3600:
return True
return False
except:
return False
def _is_cache_valid(self, cached_signals: List[MarketSignal]) -> bool:
"""Check if cached signals are still valid"""
if not cached_signals:
return False
try:
# Check if any signal is still valid (not expired)
for signal in cached_signals:
if not self._is_signal_expired(signal):
return True
return False
except:
return False
def _trim_signal_history(self):
"""Trim signal history to keep only recent signals"""
cutoff_time = datetime.utcnow().timestamp() - (7 * 24 * 60 * 60) # 7 days
self.signal_history = [
signal for signal in self.signal_history
if datetime.fromisoformat(signal.detected_at).timestamp() > cutoff_time
]
# Data retrieval methods (to be implemented with actual ALwrity services)
async def _get_competitor_data(self) -> Dict[str, Any]:
"""Get competitor data from existing services"""
# This will be implemented to integrate with existing competitor analysis
return {
'competitors': [],
'analysis_timestamp': datetime.utcnow().isoformat()
}
async def _get_seo_performance(self) -> Dict[str, Any]:
"""Get SEO performance data"""
# This will be implemented to integrate with existing SEO analysis
return {
'serp_changes': [],
'opportunities': [],
'analysis_timestamp': datetime.utcnow().isoformat()
}
async def _get_content_analysis(self) -> Dict[str, Any]:
"""Get content analysis data"""
# This will be implemented to integrate with existing content analysis
return {
'content_metrics': {},
'semantic_analysis': {},
'analysis_timestamp': datetime.utcnow().isoformat()
}
async def _get_historical_data(self) -> Dict[str, Any]:
"""Get historical data for trend analysis"""
# This will be implemented to get historical performance data
return {
'performance_metrics': {},
'social_metrics': {},
'industry_news': [],
'data_timestamp': datetime.utcnow().isoformat()
}
# Service class for market signal detection
class MarketSignalService:
"""Service class for market signal detection operations"""
def __init__(self):
self.detectors: Dict[str, MarketSignalDetector] = {}
self.signal_history: Dict[str, List[MarketSignal]] = {}
async def get_detector(self, user_id: str) -> MarketSignalDetector:
"""Get or create a market signal detector for a user"""
if user_id not in self.detectors:
self.detectors[user_id] = MarketSignalDetector(user_id)
return self.detectors[user_id]
async def detect_signals_for_user(self, user_id: str) -> List[MarketSignal]:
"""Detect market signals for a specific user"""
detector = await self.get_detector(user_id)
signals = await detector.detect_market_signals()
# Store in history
if user_id not in self.signal_history:
self.signal_history[user_id] = []
self.signal_history[user_id].extend(signals)
return signals
async def get_signal_summary(self, user_id: str) -> Dict[str, Any]:
"""Get summary of recent signals for a user"""
detector = await self.get_detector(user_id)
signals = await detector.detect_market_signals()
# Group by signal type
signals_by_type = {}
for signal in signals:
signal_type = signal.signal_type.value
if signal_type not in signals_by_type:
signals_by_type[signal_type] = []
signals_by_type[signal_type].append(signal)
# Calculate summary metrics
total_signals = len(signals)
high_priority_signals = len([s for s in signals if s.urgency_level in [UrgencyLevel.HIGH, UrgencyLevel.CRITICAL]])
average_impact_score = sum(s.impact_score for s in signals) / max(total_signals, 1)
return {
'user_id': user_id,
'total_signals': total_signals,
'high_priority_signals': high_priority_signals,
'average_impact_score': average_impact_score,
'signals_by_type': signals_by_type,
'latest_signals': signals[:5], # Top 5 most recent
'timestamp': datetime.utcnow().isoformat()
}
async def get_active_signals(self, user_id: str) -> List[MarketSignal]:
"""Get active (non-expired) signals for a user"""
detector = await self.get_detector(user_id)
all_signals = await detector.detect_market_signals()
# Filter active signals
active_signals = []
for signal in all_signals:
try:
expires_at = datetime.fromisoformat(signal.expires_at)
if datetime.utcnow() <= expires_at:
active_signals.append(signal)
except:
continue
return active_signals
# Global service instance
market_signal_service = MarketSignalService()
# Convenience functions
async def detect_market_signals(user_id: str) -> List[MarketSignal]:
"""Detect market signals for a user"""
return await market_signal_service.detect_signals_for_user(user_id)
async def get_market_signal_summary(user_id: str) -> Dict[str, Any]:
"""Get market signal summary for a user"""
return await market_signal_service.get_signal_summary(user_id)
async def get_active_market_signals(user_id: str) -> List[MarketSignal]:
"""Get active market signals for a user"""
return await market_signal_service.get_active_signals(user_id)

View File

@@ -17,112 +17,747 @@ from services.database import get_session_for_user
logger = get_service_logger(__name__)
class AgentStatus(Enum):
IDLE = "idle"
BUSY = "busy"
ERROR = "error"
OFFLINE = "offline"
INITIALIZING = "initializing"
class PerformanceMetric(Enum):
"""Types of performance metrics tracked"""
RESPONSE_TIME = "response_time"
SUCCESS_RATE = "success_rate"
TOKEN_USAGE = "token_usage"
COST_PER_ACTION = "cost_per_action"
RESOURCE_UTILIZATION = "resource_utilization"
GOAL_COMPLETION_RATE = "goal_completion_rate"
EFFICIENCY_SCORE = "efficiency_score"
RESOURCE_USAGE = "resource_usage"
USER_SATISFACTION = "user_satisfaction"
MARKET_IMPACT = "market_impact"
class AgentStatus(Enum):
"""Status of agent operations"""
ACTIVE = "active"
IDLE = "idle"
PROCESSING = "processing"
ERROR = "error"
MAINTENANCE = "maintenance"
@dataclass
class AgentPerformanceMetrics:
agent_id: str
timestamp: datetime
metrics: Dict[str, float]
class PerformanceDataPoint:
"""Single performance data point"""
timestamp: str
metric_type: PerformanceMetric
value: float
context: Dict[str, Any]
agent_id: str
user_id: str
class PerformanceMonitor:
"""
Monitors and analyzes agent performance metrics
"""
@dataclass
class AgentPerformanceSnapshot:
"""Complete performance snapshot for an agent"""
agent_id: str
user_id: str
timestamp: str
status: AgentStatus
total_actions: int
successful_actions: int
failed_actions: int
average_response_time: float
success_rate: float
efficiency_score: float
resource_usage: Dict[str, float]
market_impact_score: float
last_action_at: str
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.utcnow().isoformat()
@dataclass
class PerformanceTrend:
"""Performance trend analysis"""
metric_type: PerformanceMetric
trend_direction: str # "improving", "declining", "stable"
trend_strength: float # 0.0 to 1.0
change_rate: float # Percentage change per time unit
confidence: float # 0.0 to 1.0
period_start: str
period_end: str
@dataclass
class OptimizationRecommendation:
"""Performance optimization recommendation"""
recommendation_id: str
agent_id: str
user_id: str
recommendation_type: str
priority: str # "high", "medium", "low"
description: str
expected_impact: float # Expected improvement in performance
implementation_steps: List[str]
estimated_effort: str # "low", "medium", "high"
created_at: str
expires_at: str
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.utcnow().isoformat()
if self.expires_at is None:
# Default expiration: 7 days
expires = datetime.utcnow().timestamp() + (7 * 24 * 60 * 60)
self.expires_at = datetime.fromtimestamp(expires).isoformat()
class AgentPerformanceMonitor:
"""Main performance monitoring system for agents"""
def __init__(self, user_id: str):
self.user_id = user_id
self.performance_data: Dict[str, List[PerformanceDataPoint]] = defaultdict(list)
self.agent_snapshots: Dict[str, AgentPerformanceSnapshot] = {}
self.recommendations: List[OptimizationRecommendation] = []
self.performance_history: deque = deque(maxlen=1000) # Keep last 1000 data points
# Performance thresholds and targets
self.performance_targets = {
"success_rate": 0.85, # 85% success rate target
"response_time": 30.0, # 30 seconds average response time target
"efficiency_score": 0.75, # 75% efficiency score target
"market_impact": 0.60 # 60% market impact score target
}
# Alert thresholds
self.alert_thresholds = {
"success_rate": 0.70, # Alert if below 70%
"response_time": 60.0, # Alert if above 60 seconds
"efficiency_score": 0.50, # Alert if below 50%
"market_impact": 0.30 # Alert if below 30%
}
logger.info(f"Initialized AgentPerformanceMonitor for user: {user_id}")
async def record_performance_data(self, agent_id: str, metric_type: PerformanceMetric, value: float, context: Dict[str, Any] = None) -> bool:
"""Record a performance data point"""
try:
if context is None:
context = {}
data_point = PerformanceDataPoint(
timestamp=datetime.utcnow().isoformat(),
metric_type=metric_type,
value=value,
context=context,
agent_id=agent_id,
user_id=self.user_id
)
# Store in performance data
self.performance_data[agent_id].append(data_point)
self.performance_history.append(data_point)
# Keep only recent data (last 24 hours for real-time analysis)
cutoff_time = datetime.utcnow().timestamp() - (24 * 60 * 60)
self.performance_data[agent_id] = [
dp for dp in self.performance_data[agent_id]
if datetime.fromisoformat(dp.timestamp).timestamp() > cutoff_time
]
logger.debug(f"Recorded performance data for agent {agent_id}: {metric_type.value} = {value}")
return True
except Exception as e:
logger.error(f"Error recording performance data for agent {agent_id}: {e}")
return False
async def update_agent_snapshot(self, agent_id: str, status: AgentStatus, action_result: Dict[str, Any] = None) -> AgentPerformanceSnapshot:
"""Update performance snapshot for an agent"""
try:
# Get recent performance data
recent_data = self.performance_data[agent_id]
# Calculate metrics from recent data
total_actions = len([dp for dp in recent_data if dp.metric_type == PerformanceMetric.SUCCESS_RATE])
successful_actions = len([dp for dp in recent_data if dp.metric_type == PerformanceMetric.SUCCESS_RATE and dp.value > 0.5])
failed_actions = total_actions - successful_actions
# Calculate average response time
response_time_data = [dp.value for dp in recent_data if dp.metric_type == PerformanceMetric.RESPONSE_TIME]
avg_response_time = sum(response_time_data) / len(response_time_data) if response_time_data else 0.0
# Calculate success rate
success_rate = successful_actions / total_actions if total_actions > 0 else 0.0
# Calculate efficiency score
efficiency_data = [dp.value for dp in recent_data if dp.metric_type == PerformanceMetric.EFFICIENCY_SCORE]
avg_efficiency = sum(efficiency_data) / len(efficiency_data) if efficiency_data else 0.0
# Calculate market impact
market_impact_data = [dp.value for dp in recent_data if dp.metric_type == PerformanceMetric.MARKET_IMPACT]
avg_market_impact = sum(market_impact_data) / len(market_impact_data) if market_impact_data else 0.0
# Get resource usage
resource_usage = self._calculate_resource_usage(agent_id, recent_data)
# Get last action time
last_action_at = max([dp.timestamp for dp in recent_data], default=datetime.utcnow().isoformat()) if recent_data else datetime.utcnow().isoformat()
# Create snapshot
snapshot = AgentPerformanceSnapshot(
agent_id=agent_id,
user_id=self.user_id,
timestamp=datetime.utcnow().isoformat(),
status=status,
total_actions=total_actions,
successful_actions=successful_actions,
failed_actions=failed_actions,
average_response_time=avg_response_time,
success_rate=success_rate,
efficiency_score=avg_efficiency,
resource_usage=resource_usage,
market_impact_score=avg_market_impact,
last_action_at=last_action_at
)
self.agent_snapshots[agent_id] = snapshot
logger.info(f"Updated performance snapshot for agent {agent_id}: success_rate={success_rate:.2f}, efficiency={avg_efficiency:.2f}")
return snapshot
except Exception as e:
logger.error(f"Error updating performance snapshot for agent {agent_id}: {e}")
# Return a default snapshot
return AgentPerformanceSnapshot(
agent_id=agent_id,
user_id=self.user_id,
timestamp=datetime.utcnow().isoformat(),
status=AgentStatus.ERROR,
total_actions=0,
successful_actions=0,
failed_actions=0,
average_response_time=0.0,
success_rate=0.0,
efficiency_score=0.0,
resource_usage={},
market_impact_score=0.0,
last_action_at=datetime.utcnow().isoformat()
)
def _calculate_resource_usage(self, agent_id: str, recent_data: List[PerformanceDataPoint]) -> Dict[str, float]:
"""Calculate resource usage metrics"""
resource_usage = {
"cpu_usage": 0.0,
"memory_usage": 0.0,
"api_calls": 0,
"processing_time": 0.0
}
try:
# Extract resource usage from context
for dp in recent_data:
if dp.metric_type == PerformanceMetric.RESOURCE_USAGE and dp.context:
resource_usage["cpu_usage"] = max(resource_usage["cpu_usage"], dp.context.get("cpu_usage", 0.0))
resource_usage["memory_usage"] = max(resource_usage["memory_usage"], dp.context.get("memory_usage", 0.0))
resource_usage["api_calls"] += dp.context.get("api_calls", 0)
resource_usage["processing_time"] += dp.context.get("processing_time", 0.0)
# Calculate averages if multiple data points
if len(recent_data) > 0:
resource_usage["processing_time"] = resource_usage["processing_time"] / len(recent_data)
except Exception as e:
logger.error(f"Error calculating resource usage for agent {agent_id}: {e}")
return resource_usage
async def analyze_performance_trends(self, agent_id: str, period_hours: int = 24) -> List[PerformanceTrend]:
"""Analyze performance trends for an agent"""
try:
cutoff_time = datetime.utcnow().timestamp() - (period_hours * 60 * 60)
agent_data = [
dp for dp in self.performance_data[agent_id]
if datetime.fromisoformat(dp.timestamp).timestamp() > cutoff_time
]
if len(agent_data) < 5: # Need at least 5 data points for trend analysis
return []
trends = []
# Analyze trends for each metric type
for metric_type in PerformanceMetric:
metric_data = [dp for dp in agent_data if dp.metric_type == metric_type]
if len(metric_data) < 3: # Need at least 3 points for trend
continue
# Sort by timestamp
metric_data.sort(key=lambda x: x.timestamp)
# Calculate trend
trend_result = self._calculate_trend(metric_data)
if trend_result:
trend = PerformanceTrend(
metric_type=metric_type,
trend_direction=trend_result["direction"],
trend_strength=trend_result["strength"],
change_rate=trend_result["change_rate"],
confidence=trend_result["confidence"],
period_start=metric_data[0].timestamp,
period_end=metric_data[-1].timestamp
)
trends.append(trend)
logger.info(f"Analyzed performance trends for agent {agent_id}: found {len(trends)} trends")
return trends
except Exception as e:
logger.error(f"Error analyzing performance trends for agent {agent_id}: {e}")
return []
def _calculate_trend(self, data_points: List[PerformanceDataPoint]) -> Optional[Dict[str, Any]]:
"""Calculate trend from performance data points"""
try:
if len(data_points) < 3:
return None
# Extract values and timestamps
values = [dp.value for dp in data_points]
timestamps = [datetime.fromisoformat(dp.timestamp).timestamp() for dp in data_points]
# Simple linear trend calculation
n = len(values)
sum_x = sum(timestamps)
sum_y = sum(values)
sum_xy = sum(x * y for x, y in zip(timestamps, values))
sum_x2 = sum(x * x for x in timestamps)
# Calculate slope and intercept
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x)
intercept = (sum_y - slope * sum_x) / n
# Calculate correlation coefficient (confidence)
mean_y = sum_y / n
ss_tot = sum((y - mean_y) ** 2 for y in values)
ss_res = sum((y - (slope * x + intercept)) ** 2 for x, y in zip(timestamps, values))
r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0
# Determine trend direction
if abs(slope) < 0.001: # Nearly flat
direction = "stable"
strength = 0.0
elif slope > 0:
direction = "improving"
strength = min(1.0, abs(slope) * 100) # Scale slope to 0-1
else:
direction = "declining"
strength = min(1.0, abs(slope) * 100)
# Calculate change rate (percentage change per hour)
time_span = timestamps[-1] - timestamps[0]
if time_span > 0:
change_rate = (slope * 3600) / (values[0] if values[0] != 0 else 1) * 100 # Per hour
else:
change_rate = 0.0
return {
"direction": direction,
"strength": strength,
"change_rate": change_rate,
"confidence": r_squared
}
except Exception as e:
logger.error(f"Error calculating trend: {e}")
return None
async def generate_optimization_recommendations(self, agent_id: str) -> List[OptimizationRecommendation]:
"""Generate optimization recommendations for an agent"""
try:
recommendations = []
# Get current snapshot
snapshot = self.agent_snapshots.get(agent_id)
if not snapshot:
return []
# Get performance trends
trends = await self.analyze_performance_trends(agent_id)
# Generate recommendations based on performance analysis
# 1. Success rate recommendations
if snapshot.success_rate < self.performance_targets["success_rate"]:
recommendation = OptimizationRecommendation(
recommendation_id=f"success_rate_{agent_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
agent_id=agent_id,
user_id=self.user_id,
recommendation_type="success_rate_improvement",
priority="high" if snapshot.success_rate < self.alert_thresholds["success_rate"] else "medium",
description=f"Agent success rate is {snapshot.success_rate:.1%}, target is {self.performance_targets['success_rate']:.1%}",
expected_impact=self.performance_targets["success_rate"] - snapshot.success_rate,
implementation_steps=[
"Analyze recent failed actions to identify patterns",
"Review error logs for common failure causes",
"Update agent parameters or logic to address identified issues",
"Test improvements with small batch of actions",
"Monitor success rate improvement over time"
],
estimated_effort="medium"
)
recommendations.append(recommendation)
# 2. Response time recommendations
if snapshot.average_response_time > self.performance_targets["response_time"]:
recommendation = OptimizationRecommendation(
recommendation_id=f"response_time_{agent_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
agent_id=agent_id,
user_id=self.user_id,
recommendation_type="response_time_optimization",
priority="high" if snapshot.average_response_time > self.alert_thresholds["response_time"] else "medium",
description=f"Agent average response time is {snapshot.average_response_time:.1f}s, target is {self.performance_targets['response_time']:.1f}s",
expected_impact=(self.performance_targets["response_time"] - snapshot.average_response_time) / snapshot.average_response_time,
implementation_steps=[
"Profile agent execution to identify bottlenecks",
"Optimize API calls and external service interactions",
"Implement caching for frequently accessed data",
"Review and optimize agent logic and decision-making",
"Monitor response time improvement"
],
estimated_effort="high"
)
recommendations.append(recommendation)
# 3. Efficiency score recommendations
if snapshot.efficiency_score < self.performance_targets["efficiency_score"]:
recommendation = OptimizationRecommendation(
recommendation_id=f"efficiency_{agent_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
agent_id=agent_id,
user_id=self.user_id,
recommendation_type="efficiency_improvement",
priority="high" if snapshot.efficiency_score < self.alert_thresholds["efficiency_score"] else "medium",
description=f"Agent efficiency score is {snapshot.efficiency_score:.2f}, target is {self.performance_targets['efficiency_score']:.2f}",
expected_impact=self.performance_targets["efficiency_score"] - snapshot.efficiency_score,
implementation_steps=[
"Analyze agent decision-making patterns",
"Identify redundant or unnecessary operations",
"Optimize agent parameters and thresholds",
"Implement better error handling and recovery",
"Monitor efficiency score improvement"
],
estimated_effort="medium"
)
recommendations.append(recommendation)
# 4. Market impact recommendations
if snapshot.market_impact_score < self.performance_targets["market_impact"]:
recommendation = OptimizationRecommendation(
recommendation_id=f"market_impact_{agent_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
agent_id=agent_id,
user_id=self.user_id,
recommendation_type="market_impact_enhancement",
priority="medium",
description=f"Agent market impact score is {snapshot.market_impact_score:.2f}, target is {self.performance_targets['market_impact']:.2f}",
expected_impact=self.performance_targets["market_impact"] - snapshot.market_impact_score,
implementation_steps=[
"Analyze market signal detection accuracy",
"Improve market trend analysis and prediction",
"Enhance competitive intelligence gathering",
"Optimize timing and execution of market actions",
"Monitor market impact score improvement"
],
estimated_effort="high"
)
recommendations.append(recommendation)
# 5. Trend-based recommendations
for trend in trends:
if trend.trend_strength > 0.7 and trend.confidence > 0.8: # Strong trend with high confidence
if trend.trend_direction == "declining":
recommendation = OptimizationRecommendation(
recommendation_id=f"trend_{trend.metric_type.value}_{agent_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
agent_id=agent_id,
user_id=self.user_id,
recommendation_type="trend_reversal",
priority="high" if trend.trend_strength > 0.8 else "medium",
description=f"Strong declining trend detected in {trend.metric_type.value}: {trend.change_rate:.1f}% change per hour",
expected_impact=0.3, # Estimate 30% improvement potential
implementation_steps=[
f"Investigate causes of declining {trend.metric_type.value}",
"Identify specific factors contributing to the trend",
"Implement corrective measures based on findings",
"Monitor trend reversal over time",
"Adjust approach if trend continues"
],
estimated_effort="medium"
)
recommendations.append(recommendation)
# Sort by priority and expected impact
recommendations.sort(key=lambda x: (self._priority_weight(x.priority), x.expected_impact), reverse=True)
# Keep only top 10 recommendations
recommendations = recommendations[:10]
# Store recommendations
self.recommendations.extend(recommendations)
# Keep only recent recommendations (last 50)
if len(self.recommendations) > 50:
self.recommendations = self.recommendations[-50:]
logger.info(f"Generated {len(recommendations)} optimization recommendations for agent {agent_id}")
return recommendations
except Exception as e:
logger.error(f"Error generating optimization recommendations for agent {agent_id}: {e}")
return []
def _priority_weight(self, priority: str) -> int:
"""Convert priority to numeric weight for sorting"""
priority_weights = {
"high": 3,
"medium": 2,
"low": 1
}
return priority_weights.get(priority, 0)
async def get_performance_alerts(self, agent_id: str) -> List[Dict[str, Any]]:
"""Get performance alerts for an agent"""
alerts = []
try:
snapshot = self.agent_snapshots.get(agent_id)
if not snapshot:
return []
# Check success rate alert
if snapshot.success_rate < self.alert_thresholds["success_rate"]:
alerts.append({
"type": "performance_alert",
"metric": "success_rate",
"current_value": snapshot.success_rate,
"threshold": self.alert_thresholds["success_rate"],
"target": self.performance_targets["success_rate"],
"severity": "high" if snapshot.success_rate < 0.5 else "medium",
"message": f"Agent success rate ({snapshot.success_rate:.1%}) is below alert threshold ({self.alert_thresholds['success_rate']:.1%})",
"timestamp": datetime.utcnow().isoformat()
})
# Check response time alert
if snapshot.average_response_time > self.alert_thresholds["response_time"]:
alerts.append({
"type": "performance_alert",
"metric": "response_time",
"current_value": snapshot.average_response_time,
"threshold": self.alert_thresholds["response_time"],
"target": self.performance_targets["response_time"],
"severity": "high" if snapshot.average_response_time > 120 else "medium",
"message": f"Agent response time ({snapshot.average_response_time:.1f}s) exceeds alert threshold ({self.alert_thresholds['response_time']:.1f}s)",
"timestamp": datetime.utcnow().isoformat()
})
# Check efficiency score alert
if snapshot.efficiency_score < self.alert_thresholds["efficiency_score"]:
alerts.append({
"type": "performance_alert",
"metric": "efficiency_score",
"current_value": snapshot.efficiency_score,
"threshold": self.alert_thresholds["efficiency_score"],
"target": self.performance_targets["efficiency_score"],
"severity": "high" if snapshot.efficiency_score < 0.3 else "medium",
"message": f"Agent efficiency score ({snapshot.efficiency_score:.2f}) is below alert threshold ({self.alert_thresholds['efficiency_score']:.2f})",
"timestamp": datetime.utcnow().isoformat()
})
# Check market impact alert
if snapshot.market_impact_score < self.alert_thresholds["market_impact"]:
alerts.append({
"type": "performance_alert",
"metric": "market_impact",
"current_value": snapshot.market_impact_score,
"threshold": self.alert_thresholds["market_impact"],
"target": self.performance_targets["market_impact"],
"severity": "medium",
"message": f"Agent market impact score ({snapshot.market_impact_score:.2f}) is below alert threshold ({self.alert_thresholds['market_impact']:.2f})",
"timestamp": datetime.utcnow().isoformat()
})
return alerts
except Exception as e:
logger.error(f"Error getting performance alerts for agent {agent_id}: {e}")
return []
async def get_performance_summary(self, agent_id: str) -> Dict[str, Any]:
"""Get comprehensive performance summary for an agent"""
try:
snapshot = self.agent_snapshots.get(agent_id)
if not snapshot:
return {}
# Get trends
trends = await self.analyze_performance_trends(agent_id)
# Get recommendations
recommendations = await self.generate_optimization_recommendations(agent_id)
# Get alerts
alerts = await self.get_performance_alerts(agent_id)
# Calculate overall health score
health_score = self._calculate_health_score(snapshot)
return {
"agent_id": agent_id,
"user_id": self.user_id,
"timestamp": datetime.utcnow().isoformat(),
"overall_health": health_score,
"current_performance": asdict(snapshot),
"performance_trends": [asdict(trend) for trend in trends],
"optimization_recommendations": [asdict(rec) for rec in recommendations],
"performance_alerts": alerts,
"performance_targets": self.performance_targets,
"alert_thresholds": self.alert_thresholds
}
except Exception as e:
logger.error(f"Error getting performance summary for agent {agent_id}: {e}")
return {}
def _calculate_health_score(self, snapshot: AgentPerformanceSnapshot) -> float:
"""Calculate overall health score based on key metrics"""
try:
# Weighted scoring based on key metrics
weights = {
"success_rate": 0.3,
"response_time": 0.25,
"efficiency_score": 0.25,
"market_impact": 0.2
}
scores = {
"success_rate": min(1.0, snapshot.success_rate / self.performance_targets["success_rate"]),
"response_time": max(0.0, 1.0 - (snapshot.average_response_time / self.performance_targets["response_time"])),
"efficiency_score": min(1.0, snapshot.efficiency_score / self.performance_targets["efficiency_score"]),
"market_impact": min(1.0, snapshot.market_impact_score / self.performance_targets["market_impact"])
}
# Calculate weighted health score
health_score = sum(scores[metric] * weights[metric] for metric in weights.keys())
return round(health_score, 2)
except Exception as e:
logger.error(f"Error calculating health score: {e}")
return 0.0
def get_all_agents_performance(self) -> List[Dict[str, Any]]:
"""Get performance summary for all agents"""
all_performance = []
for agent_id, snapshot in self.agent_snapshots.items():
performance_summary = {
"agent_id": agent_id,
"user_id": self.user_id,
"status": snapshot.status.value,
"success_rate": snapshot.success_rate,
"efficiency_score": snapshot.efficiency_score,
"response_time": snapshot.average_response_time,
"market_impact": snapshot.market_impact_score,
"total_actions": snapshot.total_actions,
"last_action": snapshot.last_action_at,
"health_score": self._calculate_health_score(snapshot)
}
all_performance.append(performance_summary)
return all_performance
# Service class for performance monitoring
class AgentPerformanceService:
"""Service class for agent performance monitoring operations"""
def __init__(self):
self.metrics_buffer = deque(maxlen=1000)
self.performance_history = defaultdict(list)
self.alert_thresholds = {
PerformanceMetric.SUCCESS_RATE: 0.8, # Alert if success rate < 80%
PerformanceMetric.RESPONSE_TIME: 30.0, # Alert if response time > 30s
PerformanceMetric.GOAL_COMPLETION_RATE: 0.7 # Alert if completion < 70%
}
self.monitors: Dict[str, AgentPerformanceMonitor] = {}
self.global_performance_history: deque = deque(maxlen=5000) # Global history
async def record_metric(self,
agent_id: str,
metric_type: PerformanceMetric,
value: float,
context: Optional[Dict[str, Any]] = None):
"""Record a performance metric for an agent"""
metric_entry = AgentPerformanceMetrics(
agent_id=agent_id,
timestamp=datetime.utcnow(),
metrics={metric_type.value: value},
context=context or {}
)
async def get_monitor(self, user_id: str) -> AgentPerformanceMonitor:
"""Get or create a performance monitor for a user"""
if user_id not in self.monitors:
self.monitors[user_id] = AgentPerformanceMonitor(user_id)
return self.monitors[user_id]
async def record_agent_performance(self, user_id: str, agent_id: str, metric_type: PerformanceMetric, value: float, context: Dict[str, Any] = None) -> bool:
"""Record performance data for an agent"""
monitor = await self.get_monitor(user_id)
success = await monitor.record_performance_data(agent_id, metric_type, value, context)
self.metrics_buffer.append(metric_entry)
self.performance_history[agent_id].append(metric_entry)
# Check thresholds
await self._check_thresholds(agent_id, metric_type, value)
# Persist if needed (batching implemented in production)
# await self._persist_metric(metric_entry)
async def get_agent_performance(self, agent_id: str, time_window_minutes: int = 60) -> Dict[str, Any]:
"""Get aggregated performance metrics for an agent"""
cutoff_time = datetime.utcnow() - timedelta(minutes=time_window_minutes)
relevant_metrics = [
m for m in self.performance_history[agent_id]
if m.timestamp > cutoff_time
]
if not relevant_metrics:
return {}
aggregated = defaultdict(list)
for m in relevant_metrics:
for k, v in m.metrics.items():
aggregated[k].append(v)
result = {
"agent_id": agent_id,
"period_minutes": time_window_minutes,
"sample_size": len(relevant_metrics),
"metrics": {
k: sum(v) / len(v) for k, v in aggregated.items()
}
}
return result
async def _check_thresholds(self, agent_id: str, metric_type: PerformanceMetric, value: float):
"""Check if metric violates thresholds"""
threshold = self.alert_thresholds.get(metric_type)
if not threshold:
return
is_violation = False
if metric_type in [PerformanceMetric.SUCCESS_RATE, PerformanceMetric.GOAL_COMPLETION_RATE]:
if value < threshold:
is_violation = True
elif value > threshold:
is_violation = True
if is_violation:
logger.warning(
f"Performance alert for agent {agent_id}: "
f"{metric_type.value} = {value} (Threshold: {threshold})"
if success:
# Also record in global history
data_point = PerformanceDataPoint(
timestamp=datetime.utcnow().isoformat(),
metric_type=metric_type,
value=value,
context=context or {},
agent_id=agent_id,
user_id=user_id
)
# Trigger alert notification (impl via notification service)
self.global_performance_history.append(data_point)
return success
async def update_agent_performance_snapshot(self, user_id: str, agent_id: str, status: AgentStatus, action_result: Dict[str, Any] = None) -> AgentPerformanceSnapshot:
"""Update performance snapshot for an agent"""
monitor = await self.get_monitor(user_id)
return await monitor.update_agent_snapshot(agent_id, status, action_result)
async def get_agent_performance_summary(self, user_id: str, agent_id: str) -> Dict[str, Any]:
"""Get comprehensive performance summary for an agent"""
monitor = await self.get_monitor(user_id)
return await monitor.get_performance_summary(agent_id)
async def get_all_agents_performance_summary(self, user_id: str) -> List[Dict[str, Any]]:
"""Get performance summary for all agents for a user"""
monitor = await self.get_monitor(user_id)
return monitor.get_all_agents_performance()
async def get_global_performance_stats(self) -> Dict[str, Any]:
"""Get global performance statistics across all users and agents"""
if not self.global_performance_history:
return {}
# Calculate global statistics
total_actions = len([dp for dp in self.global_performance_history if dp.metric_type == PerformanceMetric.SUCCESS_RATE])
successful_actions = len([dp for dp in self.global_performance_history if dp.metric_type == PerformanceMetric.SUCCESS_RATE and dp.value > 0.5])
response_times = [dp.value for dp in self.global_performance_history if dp.metric_type == PerformanceMetric.RESPONSE_TIME]
avg_response_time = sum(response_times) / len(response_times) if response_times else 0.0
efficiency_scores = [dp.value for dp in self.global_performance_history if dp.metric_type == PerformanceMetric.EFFICIENCY_SCORE]
avg_efficiency = sum(efficiency_scores) / len(efficiency_scores) if efficiency_scores else 0.0
unique_users = len(set(dp.user_id for dp in self.global_performance_history))
unique_agents = len(set(dp.agent_id for dp in self.global_performance_history))
return {
"total_actions": total_actions,
"successful_actions": successful_actions,
"overall_success_rate": successful_actions / total_actions if total_actions > 0 else 0.0,
"average_response_time": avg_response_time,
"average_efficiency_score": avg_efficiency,
"unique_users": unique_users,
"unique_agents": unique_agents,
"total_data_points": len(self.global_performance_history),
"timestamp": datetime.utcnow().isoformat()
}
# Singleton instance
performance_monitor = PerformanceMonitor()
AgentPerformanceMonitor = PerformanceMonitor
performance_service = performance_monitor
# Global service instance
performance_service = AgentPerformanceService()
# Convenience functions for external use
async def record_agent_performance(user_id: str, agent_id: str, metric_type: PerformanceMetric, value: float, context: Dict[str, Any] = None) -> bool:
"""Record performance data for an agent"""
return await performance_service.record_agent_performance(user_id, agent_id, metric_type, value, context)
async def update_agent_performance_snapshot(user_id: str, agent_id: str, status: AgentStatus, action_result: Dict[str, Any] = None) -> AgentPerformanceSnapshot:
"""Update performance snapshot for an agent"""
return await performance_service.update_agent_performance_snapshot(user_id, agent_id, status, action_result)
async def get_agent_performance_summary(user_id: str, agent_id: str) -> Dict[str, Any]:
"""Get comprehensive performance summary for an agent"""
return await performance_service.get_agent_performance_summary(user_id, agent_id)
async def get_all_agents_performance_summary(user_id: str) -> List[Dict[str, Any]]:
"""Get performance summary for all agents for a user"""
return await performance_service.get_all_agents_performance_summary(user_id)

View File

@@ -0,0 +1,470 @@
import React, { useEffect, useState } from 'react';
import {
Box,
Card,
CardContent,
Typography,
Grid,
LinearProgress,
Chip,
IconButton,
Tooltip,
Alert,
Skeleton,
Button
} from '@mui/material';
import {
TrendingUp as TrendingUpIcon,
TrendingDown as TrendingDownIcon,
Warning as WarningIcon,
CheckCircle as CheckCircleIcon,
Refresh as RefreshIcon,
Info as InfoIcon,
Speed as SpeedIcon,
Assessment as AssessmentIcon,
Group as GroupIcon,
Lightbulb as LightbulbIcon
} from '@mui/icons-material';
import { motion, AnimatePresence } from 'framer-motion';
import { GlassCard, ShimmerHeader } from '../../shared/styled';
import { useSemanticDashboardStore } from '../../../stores/semanticDashboardStore';
import { SemanticHealthMetric, CompetitorSemanticSnapshot, ContentSemanticInsight } from '../../../api/semanticDashboard';
interface SemanticDashboardProps {
className?: string;
}
const SemanticDashboard: React.FC<SemanticDashboardProps> = ({ className }) => {
const {
semanticHealth,
competitorSnapshots,
contentInsights,
loading,
error,
lastUpdated,
monitoringStatus,
fetchSemanticHealth,
fetchCompetitorSnapshots,
fetchContentInsights,
fetchAllSemanticData,
refreshSemanticAnalysis,
getHealthStatusColor,
getInsightTypeColor,
getConfidenceColor
} = useSemanticDashboardStore();
const [refreshing, setRefreshing] = useState(false);
// Fetch semantic data on component mount
useEffect(() => {
fetchAllSemanticData();
}, []);
// Auto-refresh every 24 hours (86400000ms)
useEffect(() => {
const interval = setInterval(() => {
if (monitoringStatus === 'active') {
fetchAllSemanticData();
}
}, 86400000); // 24 hours
return () => clearInterval(interval);
}, [monitoringStatus, fetchAllSemanticData]);
const handleRefresh = async () => {
setRefreshing(true);
try {
await refreshSemanticAnalysis();
} catch (error) {
console.error('Failed to refresh semantic analysis:', error);
} finally {
setRefreshing(false);
}
};
const getStatusIcon = (status: string) => {
switch (status) {
case 'healthy':
return <CheckCircleIcon sx={{ color: '#4CAF50' }} />;
case 'warning':
return <WarningIcon sx={{ color: '#FF9800' }} />;
case 'critical':
return <WarningIcon sx={{ color: '#F44336' }} />;
default:
return <InfoIcon sx={{ color: '#9E9E9E' }} />;
}
};
const getTrendIcon = (trend: string) => {
switch (trend) {
case 'up':
return <TrendingUpIcon sx={{ color: '#4CAF50' }} />;
case 'down':
return <TrendingDownIcon sx={{ color: '#F44336' }} />;
default:
return <AssessmentIcon sx={{ color: '#9E9E9E' }} />;
}
};
const formatLastUpdated = (timestamp: string | null) => {
if (!timestamp) return 'Never';
const date = new Date(timestamp);
const now = new Date();
const diffMs = now.getTime() - date.getTime();
const diffHours = Math.floor(diffMs / (1000 * 60 * 60));
if (diffHours < 1) return 'Just now';
if (diffHours < 24) return `${diffHours}h ago`;
return `${Math.floor(diffHours / 24)}d ago`;
};
if (error) {
return (
<GlassCard className={className}>
<CardContent>
<Alert severity="error" sx={{ mb: 2 }}>
{error}
</Alert>
<Button
variant="outlined"
onClick={fetchAllSemanticData}
disabled={loading}
startIcon={<RefreshIcon />}
>
Retry
</Button>
</CardContent>
</GlassCard>
);
}
return (
<Box className={className}>
{/* Header */}
<Box display="flex" alignItems="center" justifyContent="space-between" mb={3}>
<Box display="flex" alignItems="center" gap={1}>
<AssessmentIcon sx={{ color: '#64B5F6', fontSize: 28 }} />
<Typography variant="h5" component="h2" sx={{ color: 'white', fontWeight: 600 }}>
Semantic Intelligence
</Typography>
<Chip
label={monitoringStatus === 'active' ? 'Active' : 'Inactive'}
color={monitoringStatus === 'active' ? 'success' : 'default'}
size="small"
sx={{ ml: 1 }}
/>
</Box>
<Box display="flex" alignItems="center" gap={1}>
<Typography variant="caption" sx={{ color: 'rgba(255,255,255,0.7)' }}>
Updated: {formatLastUpdated(lastUpdated)}
</Typography>
<Tooltip title="Refresh semantic analysis">
<IconButton
onClick={handleRefresh}
disabled={loading || refreshing}
sx={{ color: 'rgba(255,255,255,0.7)' }}
>
<RefreshIcon />
</IconButton>
</Tooltip>
</Box>
</Box>
{/* Semantic Health Card */}
<GlassCard sx={{ mb: 3 }}>
<ShimmerHeader />
<CardContent>
<Box display="flex" alignItems="center" mb={2}>
<SpeedIcon sx={{ color: '#64B5F6', mr: 1 }} />
<Typography variant="h6" sx={{ color: 'white', fontWeight: 600 }}>
Semantic Health
</Typography>
</Box>
{loading && !semanticHealth ? (
<Box>
<Skeleton variant="rectangular" height={60} sx={{ mb: 2, borderRadius: 2 }} />
<Skeleton variant="text" width="80%" />
<Skeleton variant="text" width="60%" />
</Box>
) : semanticHealth ? (
<motion.div
initial={{ opacity: 0, y: 20 }}
animate={{ opacity: 1, y: 0 }}
transition={{ duration: 0.5 }}
>
<Box display="flex" alignItems="center" mb={2}>
{getStatusIcon(semanticHealth.status)}
<Box ml={2} flex={1}>
<Typography variant="h6" sx={{ color: 'white' }}>
{semanticHealth.metric_name.replace(/_/g, ' ').toUpperCase()}
</Typography>
<Typography variant="body2" sx={{ color: 'rgba(255,255,255,0.7)' }}>
{semanticHealth.description}
</Typography>
</Box>
<Box textAlign="right">
<Typography variant="h4" sx={{ color: 'white', fontWeight: 700 }}>
{Math.round(semanticHealth.value * 100)}%
</Typography>
<LinearProgress
variant="determinate"
value={semanticHealth.value * 100}
sx={{
width: 100,
height: 6,
borderRadius: 3,
backgroundColor: 'rgba(255,255,255,0.2)',
'& .MuiLinearProgress-bar': {
backgroundColor: getHealthStatusColor(semanticHealth.status),
borderRadius: 3
}
}}
/>
</Box>
</Box>
{semanticHealth.recommendations.length > 0 && (
<Box>
<Typography variant="subtitle2" sx={{ color: 'rgba(255,255,255,0.8)', mb: 1 }}>
Recommendations:
</Typography>
{semanticHealth.recommendations.map((rec, index) => (
<Typography key={index} variant="body2" sx={{ color: 'rgba(255,255,255,0.7)', mb: 0.5 }}>
{rec}
</Typography>
))}
</Box>
)}
</motion.div>
) : (
<Typography sx={{ color: 'rgba(255,255,255,0.7)' }}>
No semantic health data available
</Typography>
)}
</CardContent>
</GlassCard>
{/* Competitor Semantic Analysis */}
<GlassCard sx={{ mb: 3 }}>
<ShimmerHeader />
<CardContent>
<Box display="flex" alignItems="center" mb={2}>
<GroupIcon sx={{ color: '#81C784', mr: 1 }} />
<Typography variant="h6" sx={{ color: 'white', fontWeight: 600 }}>
Competitor Semantic Positioning
</Typography>
</Box>
{loading && competitorSnapshots.length === 0 ? (
<Grid container spacing={2}>
{[1, 2, 3].map((i) => (
<Grid item xs={12} md={6} key={i}>
<Skeleton variant="rectangular" height={120} sx={{ borderRadius: 2 }} />
</Grid>
))}
</Grid>
) : competitorSnapshots.length > 0 ? (
<Grid container spacing={2}>
<AnimatePresence>
{competitorSnapshots.map((competitor, index) => (
<Grid item xs={12} md={6} key={competitor.competitor_id}>
<motion.div
initial={{ opacity: 0, y: 20 }}
animate={{ opacity: 1, y: 0 }}
transition={{ duration: 0.5, delay: index * 0.1 }}
>
<Card
sx={{
background: 'rgba(255,255,255,0.05)',
backdropFilter: 'blur(10px)',
border: '1px solid rgba(255,255,255,0.1)',
borderRadius: 2
}}
>
<CardContent>
<Typography variant="h6" sx={{ color: 'white', mb: 1 }}>
{competitor.competitor_name}
</Typography>
<Box display="flex" justifyContent="space-between" alignItems="center" mb={1}>
<Typography variant="body2" sx={{ color: 'rgba(255,255,255,0.7)' }}>
Semantic Overlap
</Typography>
<Typography variant="h6" sx={{ color: 'white' }}>
{Math.round(competitor.semantic_overlap * 100)}%
</Typography>
</Box>
<Box display="flex" justifyContent="space-between" alignItems="center" mb={1}>
<Typography variant="body2" sx={{ color: 'rgba(255,255,255,0.7)' }}>
Authority Score
</Typography>
<Typography variant="h6" sx={{ color: 'white' }}>
{Math.round(competitor.authority_score * 100)}
</Typography>
</Box>
<Box display="flex" justifyContent="space-between" alignItems="center">
<Typography variant="body2" sx={{ color: 'rgba(255,255,255,0.7)' }}>
Content Volume
</Typography>
<Typography variant="body2" sx={{ color: 'white' }}>
{competitor.content_volume} pages
</Typography>
</Box>
{competitor.trending_topics.length > 0 && (
<Box mt={2}>
<Typography variant="caption" sx={{ color: 'rgba(255,255,255,0.7)', mb: 1, display: 'block' }}>
Trending Topics:
</Typography>
<Box display="flex" flexWrap="wrap" gap={0.5}>
{competitor.trending_topics.slice(0, 3).map((topic, idx) => (
<Chip
key={idx}
label={topic}
size="small"
sx={{
backgroundColor: 'rgba(255,255,255,0.1)',
color: 'white',
fontSize: '0.7rem'
}}
/>
))}
</Box>
</Box>
)}
</CardContent>
</Card>
</motion.div>
</Grid>
))}
</AnimatePresence>
</Grid>
) : (
<Typography sx={{ color: 'rgba(255,255,255,0.7)' }}>
No competitor semantic data available
</Typography>
)}
</CardContent>
</GlassCard>
{/* Content Insights */}
<GlassCard>
<ShimmerHeader />
<CardContent>
<Box display="flex" alignItems="center" mb={2}>
<LightbulbIcon sx={{ color: '#FFD54F', mr: 1 }} />
<Typography variant="h6" sx={{ color: 'white', fontWeight: 600 }}>
Content Intelligence Insights
</Typography>
</Box>
{loading && contentInsights.length === 0 ? (
<Box>
{[1, 2, 3].map((i) => (
<Box key={i} mb={2}>
<Skeleton variant="rectangular" height={80} sx={{ borderRadius: 2, mb: 1 }} />
</Box>
))}
</Box>
) : contentInsights.length > 0 ? (
<AnimatePresence>
{contentInsights.map((insight, index) => (
<motion.div
key={insight.insight_id}
initial={{ opacity: 0, y: 20 }}
animate={{ opacity: 1, y: 0 }}
transition={{ duration: 0.5, delay: index * 0.1 }}
>
<Card
sx={{
background: 'rgba(255,255,255,0.05)',
backdropFilter: 'blur(10px)',
border: '1px solid rgba(255,255,255,0.1)',
borderRadius: 2,
mb: 2
}}
>
<CardContent>
<Box display="flex" alignItems="center" justifyContent="space-between" mb={1}>
<Box display="flex" alignItems="center" gap={1}>
<Chip
label={insight.insight_type.toUpperCase()}
size="small"
sx={{
backgroundColor: getInsightTypeColor(insight.insight_type),
color: 'white',
fontWeight: 600,
fontSize: '0.7rem'
}}
/>
<Typography variant="subtitle2" sx={{ color: 'white', fontWeight: 600 }}>
{insight.title}
</Typography>
</Box>
<Box display="flex" alignItems="center" gap={1}>
<Tooltip title={`Confidence: ${Math.round(insight.confidence_score * 100)}%`}>
<Box>
<LinearProgress
variant="determinate"
value={insight.confidence_score * 100}
sx={{
width: 60,
height: 4,
borderRadius: 2,
backgroundColor: 'rgba(255,255,255,0.2)',
'& .MuiLinearProgress-bar': {
backgroundColor: getConfidenceColor(insight.confidence_score),
borderRadius: 2
}
}}
/>
</Box>
</Tooltip>
</Box>
</Box>
<Typography variant="body2" sx={{ color: 'rgba(255,255,255,0.8)', mb: 1 }}>
{insight.description}
</Typography>
<Box display="flex" justifyContent="space-between" alignItems="center">
<Typography variant="caption" sx={{ color: 'rgba(255,255,255,0.6)' }}>
Impact Score: {Math.round(insight.impact_score * 100)}/100
</Typography>
<Typography variant="caption" sx={{ color: 'rgba(255,255,255,0.6)' }}>
Expires: {new Date(insight.expires_at).toLocaleDateString()}
</Typography>
</Box>
{insight.suggested_actions.length > 0 && (
<Box mt={1}>
<Typography variant="caption" sx={{ color: 'rgba(255,255,255,0.7)', mb: 0.5, display: 'block' }}>
Suggested Actions:
</Typography>
{insight.suggested_actions.map((action, idx) => (
<Typography key={idx} variant="caption" sx={{ color: 'rgba(255,255,255,0.6)', display: 'block' }}>
{action}
</Typography>
))}
</Box>
)}
</CardContent>
</Card>
</motion.div>
))}
</AnimatePresence>
) : (
<Typography sx={{ color: 'rgba(255,255,255,0.7)' }}>
No content insights available
</Typography>
)}
</CardContent>
</GlassCard>
</Box>
);
};
export default SemanticDashboard;