diff --git a/backend/services/intelligence/agents/market_signal_detector.py b/backend/services/intelligence/agents/market_signal_detector.py index e07900bb..d5ecd366 100644 --- a/backend/services/intelligence/agents/market_signal_detector.py +++ b/backend/services/intelligence/agents/market_signal_detector.py @@ -172,79 +172,646 @@ class MarketSignalDetector: return prioritized_signals except Exception as e: - logger.error(f"Error detecting market signals: {str(e)}") + logger.error(f"Error detecting market signals for user {self.user_id}: {e}") return [] - + async def _get_signal_context(self) -> SignalContext: - """Fetch current context for signal detection""" - # Placeholder implementation - return SignalContext( - user_id=self.user_id, - competitor_data={}, - semantic_health={}, - seo_performance={}, - content_analysis={}, - historical_data={} - ) - - def _is_cache_valid(self, signals: List[MarketSignal]) -> bool: - """Check if cached signals are still valid""" - if not signals: - return False - # Basic check for now - return True - + """Get comprehensive context for signal detection""" + try: + # Get semantic health + semantic_health = await self.semantic_monitor.check_semantic_health(self.user_id) + + # Get competitor data + competitor_data = await self._get_competitor_data() + + # Get SEO performance + seo_performance = await self._get_seo_performance() + + # Get content analysis + content_analysis = await self._get_content_analysis() + + # Get historical data + historical_data = await self._get_historical_data() + + return SignalContext( + user_id=self.user_id, + competitor_data=competitor_data, + semantic_health=semantic_health, + seo_performance=seo_performance, + content_analysis=content_analysis, + historical_data=historical_data + ) + + except Exception as e: + logger.error(f"Error getting signal context for user {self.user_id}: {e}") + # Return minimal context + return SignalContext( + user_id=self.user_id, + competitor_data={}, + semantic_health={}, + seo_performance={}, + content_analysis={}, + historical_data={} + ) + async def _detect_competitor_signals(self, context: SignalContext) -> List[MarketSignal]: - """Detect signals from competitor activities""" - return [] - - async def _detect_serp_signals(self, context: SignalContext) -> List[MarketSignal]: - """Detect signals from SERP changes""" - return [] - - async def _detect_social_signals(self, context: SignalContext) -> List[MarketSignal]: - """Detect signals from social trends""" - return [] - - async def _detect_industry_signals(self, context: SignalContext) -> List[MarketSignal]: - """Detect signals from industry news""" - return [] - - async def _detect_performance_signals(self, context: SignalContext) -> List[MarketSignal]: - """Detect signals from site performance""" - return [] - - async def _detect_content_gap_signals(self, context: SignalContext) -> List[MarketSignal]: - """Detect signals from content gaps""" - return [] - - async def _detect_seo_opportunity_signals(self, context: SignalContext) -> List[MarketSignal]: - """Detect signals from SEO opportunities""" - return [] - - def _filter_signals(self, signals: List[MarketSignal]) -> List[MarketSignal]: - """Filter out low-quality or duplicate signals""" + """Detect competitor-related market signals""" + signals = [] + + try: + competitor_data = context.competitor_data.get('competitors', []) + + for competitor in competitor_data: + competitor_id = competitor.get('id') + competitor_name = competitor.get('name', 'Unknown Competitor') + + # Check for significant changes in competitor metrics + current_metrics = { + 'content_volume': competitor.get('content_volume', 0), + 'semantic_overlap': competitor.get('semantic_overlap', 0), + 'authority_score': competitor.get('authority_score', 0), + 'trending_topics': len(competitor.get('trending_topics', [])) + } + + # Compare with baseline metrics + baseline_key = f"competitor_{competitor_id}" + baseline = self.baseline_metrics.get(baseline_key, current_metrics) + + # Detect significant changes + for metric, current_value in current_metrics.items(): + baseline_value = baseline.get(metric, current_value) + change_percentage = abs(current_value - baseline_value) / max(baseline_value, 1) + + if change_percentage > self.thresholds['competitor_change_threshold']: + signal = MarketSignal( + signal_id=f"competitor_{competitor_id}_{metric}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + signal_type=SignalType.COMPETITOR_CHANGE, + source=competitor_name, + description=f"Competitor {competitor_name} shows significant change in {metric}: {change_percentage:.1%}", + impact_score=min(0.9, change_percentage * 2), # Cap at 0.9 + urgency_level=self._determine_urgency(change_percentage), + confidence_score=0.8, + related_topics=competitor.get('trending_topics', [])[:3], + suggested_actions=self._get_competitor_response_actions(metric, change_percentage), + metadata={ + 'competitor_id': competitor_id, + 'metric': metric, + 'old_value': baseline_value, + 'new_value': current_value, + 'change_percentage': change_percentage + } + ) + signals.append(signal) + + # Update baseline + self.baseline_metrics[baseline_key] = current_metrics + + except Exception as e: + logger.error(f"Error detecting competitor signals: {e}") + return signals - + + async def _detect_serp_signals(self, context: SignalContext) -> List[MarketSignal]: + """Detect SERP-related market signals""" + signals = [] + + try: + seo_performance = context.seo_performance + + # Check for significant SERP position changes + serp_changes = seo_performance.get('serp_changes', []) + + for change in serp_changes: + keyword = change.get('keyword') + old_position = change.get('old_position', 100) + new_position = change.get('new_position', 100) + + # Calculate position change + position_change = old_position - new_position # Positive = improvement + change_percentage = abs(position_change) / max(old_position, 1) + + if change_percentage > self.thresholds['serp_fluctuation_threshold']: + if position_change > 0: # Improvement + description = f"Significant SERP improvement for '{keyword}': moved from {old_position} to {new_position}" + impact_score = min(0.8, change_percentage * 1.5) + urgency_level = UrgencyLevel.LOW + suggested_actions = ["Monitor trend", "Capitalize on improvement"] + else: # Decline + description = f"Significant SERP decline for '{keyword}': dropped from {old_position} to {new_position}" + impact_score = min(0.9, change_percentage * 2) + urgency_level = UrgencyLevel.HIGH + suggested_actions = ["Investigate cause", "Optimize content", "Check technical SEO"] + + signal = MarketSignal( + signal_id=f"serp_{keyword.replace(' ', '_')}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + signal_type=SignalType.SERP_FLUCTUATION, + source="SERP Analysis", + description=description, + impact_score=impact_score, + urgency_level=urgency_level, + confidence_score=0.85, + related_topics=[keyword], + suggested_actions=suggested_actions, + metadata={ + 'keyword': keyword, + 'old_position': old_position, + 'new_position': new_position, + 'position_change': position_change, + 'change_percentage': change_percentage + } + ) + signals.append(signal) + + except Exception as e: + logger.error(f"Error detecting SERP signals: {e}") + + return signals + + async def _detect_social_signals(self, context: SignalContext) -> List[MarketSignal]: + """Detect social media trend signals""" + signals = [] + + try: + # Get social media data + social_data = context.historical_data.get('social_metrics', {}) + + # Check for trending topics + trending_topics = social_data.get('trending_topics', []) + + for topic in trending_topics: + topic_name = topic.get('topic') + engagement_rate = topic.get('engagement_rate', 0) + trend_score = topic.get('trend_score', 0) + + if trend_score > self.thresholds['social_trend_threshold']: + signal = MarketSignal( + signal_id=f"social_{topic_name.replace(' ', '_')}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + signal_type=SignalType.SOCIAL_TREND, + source="Social Media Analysis", + description=f"Social trend detected: '{topic_name}' with engagement rate {engagement_rate:.2%}", + impact_score=min(0.8, trend_score * 1.5), + urgency_level=self._determine_urgency(trend_score), + confidence_score=0.75, + related_topics=[topic_name], + suggested_actions=["Create content on trending topic", "Monitor trend development", "Engage with trend"], + metadata={ + 'topic': topic_name, + 'engagement_rate': engagement_rate, + 'trend_score': trend_score, + 'platforms': topic.get('platforms', []) + } + ) + signals.append(signal) + + except Exception as e: + logger.error(f"Error detecting social signals: {e}") + + return signals + + async def _detect_industry_signals(self, context: SignalContext) -> List[MarketSignal]: + """Detect industry news and trend signals""" + signals = [] + + try: + # Get industry data + industry_data = context.historical_data.get('industry_news', {}) + + # Check for significant industry developments + news_items = industry_data.get('recent_news', []) + + for news in news_items: + news_title = news.get('title', 'Industry News') + relevance_score = news.get('relevance_score', 0) + impact_assessment = news.get('impact_assessment', 'medium') + + if relevance_score > 0.6: # High relevance to user's industry + signal = MarketSignal( + signal_id=f"industry_{hash(news_title) % 10000}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + signal_type=SignalType.INDUSTRY_NEWS, + source="Industry News Analysis", + description=f"Industry development: {news_title}", + impact_score=min(0.9, relevance_score * 1.2), + urgency_level=self._map_impact_to_urgency(impact_assessment), + confidence_score=0.8, + related_topics=news.get('related_topics', []), + suggested_actions=["Analyze industry impact", "Adjust strategy if needed", "Monitor competitor response"], + metadata={ + 'news_title': news_title, + 'relevance_score': relevance_score, + 'impact_assessment': impact_assessment, + 'news_date': news.get('date'), + 'source': news.get('source') + } + ) + signals.append(signal) + + except Exception as e: + logger.error(f"Error detecting industry signals: {e}") + + return signals + + async def _detect_performance_signals(self, context: SignalContext) -> List[MarketSignal]: + """Detect performance change signals""" + signals = [] + + try: + # Get performance data + performance_data = context.historical_data.get('performance_metrics', {}) + + # Check for significant changes in key metrics + current_metrics = { + 'traffic': performance_data.get('current_traffic', 0), + 'engagement': performance_data.get('current_engagement', 0), + 'conversion_rate': performance_data.get('current_conversion_rate', 0), + 'bounce_rate': performance_data.get('current_bounce_rate', 0) + } + + # Compare with historical baseline + baseline_metrics = performance_data.get('baseline_metrics', current_metrics) + + for metric, current_value in current_metrics.items(): + baseline_value = baseline_metrics.get(metric, current_value) + + if baseline_value > 0: # Avoid division by zero + change_percentage = abs(current_value - baseline_value) / baseline_value + + if change_percentage > self.thresholds['performance_change_threshold']: + if current_value > baseline_value: # Improvement + description = f"Performance improvement detected: {metric} increased by {change_percentage:.1%}" + impact_score = min(0.7, change_percentage * 1.5) + urgency_level = UrgencyLevel.LOW + suggested_actions = ["Monitor trend", "Analyze success factors", "Scale successful strategies"] + else: # Decline + description = f"Performance decline detected: {metric} decreased by {change_percentage:.1%}" + impact_score = min(0.9, change_percentage * 2) + urgency_level = UrgencyLevel.HIGH + suggested_actions = ["Investigate cause", "Implement corrective measures", "Monitor recovery"] + + signal = MarketSignal( + signal_id=f"performance_{metric}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + signal_type=SignalType.PERFORMANCE_CHANGE, + source="Performance Analytics", + description=description, + impact_score=impact_score, + urgency_level=urgency_level, + confidence_score=0.9, + related_topics=[metric], + suggested_actions=suggested_actions, + metadata={ + 'metric': metric, + 'old_value': baseline_value, + 'new_value': current_value, + 'change_percentage': change_percentage, + 'trend_direction': 'up' if current_value > baseline_value else 'down' + } + ) + signals.append(signal) + + except Exception as e: + logger.error(f"Error detecting performance signals: {e}") + + return signals + + async def _detect_content_gap_signals(self, context: SignalContext) -> List[MarketSignal]: + """Detect content gap signals""" + signals = [] + + try: + semantic_health = context.semantic_health + + # Check for significant semantic gaps + semantic_gaps = semantic_health.get('semantic_gaps', []) + + for gap in semantic_gaps: + gap_topic = gap.get('topic') + gap_score = gap.get('gap_score', 0) + competitor_coverage = gap.get('competitor_coverage', 0) + + if gap_score > self.thresholds['content_gap_threshold']: + signal = MarketSignal( + signal_id=f"content_gap_{gap_topic.replace(' ', '_')}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + signal_type=SignalType.CONTENT_GAP, + source="Semantic Analysis", + description=f"Content gap identified: '{gap_topic}' with gap score {gap_score:.2f}", + impact_score=min(0.8, gap_score * 1.5), + urgency_level=self._determine_urgency(gap_score), + confidence_score=0.85, + related_topics=[gap_topic], + suggested_actions=["Create content on gap topic", "Analyze competitor approach", "Optimize existing content"], + metadata={ + 'gap_topic': gap_topic, + 'gap_score': gap_score, + 'competitor_coverage': competitor_coverage, + 'semantic_similarity': gap.get('semantic_similarity', 0) + } + ) + signals.append(signal) + + except Exception as e: + logger.error(f"Error detecting content gap signals: {e}") + + return signals + + async def _detect_seo_opportunity_signals(self, context: SignalContext) -> List[MarketSignal]: + """Detect SEO opportunity signals""" + signals = [] + + try: + seo_performance = context.seo_performance + + # Check for SEO opportunities + seo_opportunities = seo_performance.get('opportunities', []) + + for opportunity in seo_opportunities: + opportunity_type = opportunity.get('type') + opportunity_score = opportunity.get('opportunity_score', 0) + estimated_impact = opportunity.get('estimated_impact', 'medium') + + if opportunity_score > self.thresholds['seo_opportunity_threshold']: + signal = MarketSignal( + signal_id=f"seo_opportunity_{opportunity_type}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + signal_type=SignalType.SEO_OPPORTUNITY, + source="SEO Analysis", + description=f"SEO opportunity identified: {opportunity_type} with score {opportunity_score:.2f}", + impact_score=min(0.8, opportunity_score * 1.5), + urgency_level=self._map_impact_to_urgency(estimated_impact), + confidence_score=0.8, + related_topics=opportunity.get('related_keywords', []), + suggested_actions=["Implement SEO recommendation", "Monitor impact", "Scale successful optimizations"], + metadata={ + 'opportunity_type': opportunity_type, + 'opportunity_score': opportunity_score, + 'estimated_impact': estimated_impact, + 'implementation_effort': opportunity.get('implementation_effort', 'medium'), + 'priority_score': opportunity.get('priority_score', 0) + } + ) + signals.append(signal) + + except Exception as e: + logger.error(f"Error detecting SEO opportunity signals: {e}") + + return signals + + # Helper methods + + def _determine_urgency(self, score: float) -> UrgencyLevel: + """Determine urgency level based on score""" + if score >= 0.8: + return UrgencyLevel.CRITICAL + elif score >= 0.6: + return UrgencyLevel.HIGH + elif score >= 0.3: + return UrgencyLevel.MEDIUM + else: + return UrgencyLevel.LOW + + def _map_impact_to_urgency(self, impact: str) -> UrgencyLevel: + """Map impact assessment to urgency level""" + impact_map = { + 'critical': UrgencyLevel.CRITICAL, + 'high': UrgencyLevel.HIGH, + 'medium': UrgencyLevel.MEDIUM, + 'low': UrgencyLevel.LOW + } + return impact_map.get(impact.lower(), UrgencyLevel.MEDIUM) + + def _get_competitor_response_actions(self, metric: str, change_percentage: float) -> List[str]: + """Get suggested actions for competitor changes""" + actions = [] + + if metric == 'content_volume': + if change_percentage > 0: + actions = ["Analyze competitor content strategy", "Identify content gaps", "Increase content production"] + else: + actions = ["Monitor competitor focus shift", "Identify new opportunities", "Maintain content quality"] + + elif metric == 'semantic_overlap': + if change_percentage > 0: + actions = ["Differentiate content strategy", "Find unique angles", "Avoid keyword cannibalization"] + else: + actions = ["Explore new topics", "Expand content coverage", "Monitor competitor positioning"] + + elif metric == 'authority_score': + if change_percentage > 0: + actions = ["Analyze competitor backlink strategy", "Improve content quality", "Build domain authority"] + else: + actions = ["Capitalize on competitor weakness", "Strengthen own authority", "Monitor recovery"] + + else: + actions = ["Monitor competitor activity", "Analyze impact on market", "Adjust strategy if needed"] + + return actions + + def _filter_signals(self, signals: List[MarketSignal]) -> List[MarketSignal]: + """Filter signals based on relevance and quality""" + filtered = [] + + for signal in signals: + # Skip low confidence signals + if signal.confidence_score < 0.5: + continue + + # Skip expired signals + if self._is_signal_expired(signal): + continue + + # Skip duplicate signals (same type and source within short timeframe) + if self._is_duplicate_signal(signal, filtered): + continue + + filtered.append(signal) + + return filtered + def _prioritize_signals(self, signals: List[MarketSignal]) -> List[MarketSignal]: """Prioritize signals based on impact and urgency""" - return sorted(signals, key=lambda x: (x.urgency_level.value, x.impact_score), reverse=True) - - def _trim_signal_history(self): - """Keep signal history within limits""" - if len(self.signal_history) > 1000: - self.signal_history = self.signal_history[-1000:] - -class MarketTrendAnalyzer: - """ - Analyzer for detecting market trends from aggregated signals. - """ - def __init__(self, user_id: str): - self.user_id = user_id - self.detector = MarketSignalDetector(user_id) + # Sort by priority score (impact * urgency_weight) + def priority_score(signal: MarketSignal) -> float: + urgency_weights = { + UrgencyLevel.CRITICAL: 1.0, + UrgencyLevel.HIGH: 0.8, + UrgencyLevel.MEDIUM: 0.5, + UrgencyLevel.LOW: 0.2 + } + + urgency_weight = urgency_weights.get(signal.urgency_level, 0.5) + return signal.impact_score * urgency_weight * signal.confidence_score - async def analyze_trends(self, context: Optional[Dict[str, Any]] = None) -> List[MarketSignal]: - """Analyze current market trends""" - # Placeholder implementation - logger.info(f"Analyzing market trends for user {self.user_id}") - return [] + return sorted(signals, key=priority_score, reverse=True) + + def _is_signal_expired(self, signal: MarketSignal) -> bool: + """Check if signal has expired""" + try: + expires_at = datetime.fromisoformat(signal.expires_at) + return datetime.utcnow() > expires_at + except: + return False + + def _is_duplicate_signal(self, signal: MarketSignal, existing_signals: List[MarketSignal]) -> bool: + """Check if signal is a duplicate of recent signals""" + try: + signal_time = datetime.fromisoformat(signal.detected_at) + + for existing in existing_signals: + if (existing.signal_type == signal.signal_type and + existing.source == signal.source and + existing.related_topics == signal.related_topics): + + # Check if within 1 hour + existing_time = datetime.fromisoformat(existing.detected_at) + if abs((signal_time - existing_time).total_seconds()) < 3600: + return True + + return False + except: + return False + + def _is_cache_valid(self, cached_signals: List[MarketSignal]) -> bool: + """Check if cached signals are still valid""" + if not cached_signals: + return False + + try: + # Check if any signal is still valid (not expired) + for signal in cached_signals: + if not self._is_signal_expired(signal): + return True + + return False + except: + return False + + def _trim_signal_history(self): + """Trim signal history to keep only recent signals""" + cutoff_time = datetime.utcnow().timestamp() - (7 * 24 * 60 * 60) # 7 days + + self.signal_history = [ + signal for signal in self.signal_history + if datetime.fromisoformat(signal.detected_at).timestamp() > cutoff_time + ] + + # Data retrieval methods (to be implemented with actual ALwrity services) + + async def _get_competitor_data(self) -> Dict[str, Any]: + """Get competitor data from existing services""" + # This will be implemented to integrate with existing competitor analysis + return { + 'competitors': [], + 'analysis_timestamp': datetime.utcnow().isoformat() + } + + async def _get_seo_performance(self) -> Dict[str, Any]: + """Get SEO performance data""" + # This will be implemented to integrate with existing SEO analysis + return { + 'serp_changes': [], + 'opportunities': [], + 'analysis_timestamp': datetime.utcnow().isoformat() + } + + async def _get_content_analysis(self) -> Dict[str, Any]: + """Get content analysis data""" + # This will be implemented to integrate with existing content analysis + return { + 'content_metrics': {}, + 'semantic_analysis': {}, + 'analysis_timestamp': datetime.utcnow().isoformat() + } + + async def _get_historical_data(self) -> Dict[str, Any]: + """Get historical data for trend analysis""" + # This will be implemented to get historical performance data + return { + 'performance_metrics': {}, + 'social_metrics': {}, + 'industry_news': [], + 'data_timestamp': datetime.utcnow().isoformat() + } + +# Service class for market signal detection +class MarketSignalService: + """Service class for market signal detection operations""" + + def __init__(self): + self.detectors: Dict[str, MarketSignalDetector] = {} + self.signal_history: Dict[str, List[MarketSignal]] = {} + + async def get_detector(self, user_id: str) -> MarketSignalDetector: + """Get or create a market signal detector for a user""" + if user_id not in self.detectors: + self.detectors[user_id] = MarketSignalDetector(user_id) + return self.detectors[user_id] + + async def detect_signals_for_user(self, user_id: str) -> List[MarketSignal]: + """Detect market signals for a specific user""" + detector = await self.get_detector(user_id) + signals = await detector.detect_market_signals() + + # Store in history + if user_id not in self.signal_history: + self.signal_history[user_id] = [] + self.signal_history[user_id].extend(signals) + + return signals + + async def get_signal_summary(self, user_id: str) -> Dict[str, Any]: + """Get summary of recent signals for a user""" + detector = await self.get_detector(user_id) + signals = await detector.detect_market_signals() + + # Group by signal type + signals_by_type = {} + for signal in signals: + signal_type = signal.signal_type.value + if signal_type not in signals_by_type: + signals_by_type[signal_type] = [] + signals_by_type[signal_type].append(signal) + + # Calculate summary metrics + total_signals = len(signals) + high_priority_signals = len([s for s in signals if s.urgency_level in [UrgencyLevel.HIGH, UrgencyLevel.CRITICAL]]) + average_impact_score = sum(s.impact_score for s in signals) / max(total_signals, 1) + + return { + 'user_id': user_id, + 'total_signals': total_signals, + 'high_priority_signals': high_priority_signals, + 'average_impact_score': average_impact_score, + 'signals_by_type': signals_by_type, + 'latest_signals': signals[:5], # Top 5 most recent + 'timestamp': datetime.utcnow().isoformat() + } + + async def get_active_signals(self, user_id: str) -> List[MarketSignal]: + """Get active (non-expired) signals for a user""" + detector = await self.get_detector(user_id) + all_signals = await detector.detect_market_signals() + + # Filter active signals + active_signals = [] + for signal in all_signals: + try: + expires_at = datetime.fromisoformat(signal.expires_at) + if datetime.utcnow() <= expires_at: + active_signals.append(signal) + except: + continue + + return active_signals + +# Global service instance +market_signal_service = MarketSignalService() + +# Convenience functions +async def detect_market_signals(user_id: str) -> List[MarketSignal]: + """Detect market signals for a user""" + return await market_signal_service.detect_signals_for_user(user_id) + +async def get_market_signal_summary(user_id: str) -> Dict[str, Any]: + """Get market signal summary for a user""" + return await market_signal_service.get_signal_summary(user_id) + +async def get_active_market_signals(user_id: str) -> List[MarketSignal]: + """Get active market signals for a user""" + return await market_signal_service.get_active_signals(user_id) \ No newline at end of file diff --git a/backend/services/intelligence/agents/performance_monitor.py b/backend/services/intelligence/agents/performance_monitor.py index 2a3b3be1..6b129a71 100644 --- a/backend/services/intelligence/agents/performance_monitor.py +++ b/backend/services/intelligence/agents/performance_monitor.py @@ -17,112 +17,747 @@ from services.database import get_session_for_user logger = get_service_logger(__name__) -class AgentStatus(Enum): - IDLE = "idle" - BUSY = "busy" - ERROR = "error" - OFFLINE = "offline" - INITIALIZING = "initializing" - class PerformanceMetric(Enum): + """Types of performance metrics tracked""" RESPONSE_TIME = "response_time" SUCCESS_RATE = "success_rate" - TOKEN_USAGE = "token_usage" - COST_PER_ACTION = "cost_per_action" - RESOURCE_UTILIZATION = "resource_utilization" - GOAL_COMPLETION_RATE = "goal_completion_rate" + EFFICIENCY_SCORE = "efficiency_score" + RESOURCE_USAGE = "resource_usage" + USER_SATISFACTION = "user_satisfaction" + MARKET_IMPACT = "market_impact" + +class AgentStatus(Enum): + """Status of agent operations""" + ACTIVE = "active" + IDLE = "idle" + PROCESSING = "processing" + ERROR = "error" + MAINTENANCE = "maintenance" @dataclass -class AgentPerformanceMetrics: - agent_id: str - timestamp: datetime - metrics: Dict[str, float] +class PerformanceDataPoint: + """Single performance data point""" + timestamp: str + metric_type: PerformanceMetric + value: float context: Dict[str, Any] + agent_id: str + user_id: str -class PerformanceMonitor: - """ - Monitors and analyzes agent performance metrics - """ +@dataclass +class AgentPerformanceSnapshot: + """Complete performance snapshot for an agent""" + agent_id: str + user_id: str + timestamp: str + status: AgentStatus + total_actions: int + successful_actions: int + failed_actions: int + average_response_time: float + success_rate: float + efficiency_score: float + resource_usage: Dict[str, float] + market_impact_score: float + last_action_at: str + + def __post_init__(self): + if self.timestamp is None: + self.timestamp = datetime.utcnow().isoformat() + +@dataclass +class PerformanceTrend: + """Performance trend analysis""" + metric_type: PerformanceMetric + trend_direction: str # "improving", "declining", "stable" + trend_strength: float # 0.0 to 1.0 + change_rate: float # Percentage change per time unit + confidence: float # 0.0 to 1.0 + period_start: str + period_end: str + +@dataclass +class OptimizationRecommendation: + """Performance optimization recommendation""" + recommendation_id: str + agent_id: str + user_id: str + recommendation_type: str + priority: str # "high", "medium", "low" + description: str + expected_impact: float # Expected improvement in performance + implementation_steps: List[str] + estimated_effort: str # "low", "medium", "high" + created_at: str + expires_at: str + + def __post_init__(self): + if self.created_at is None: + self.created_at = datetime.utcnow().isoformat() + if self.expires_at is None: + # Default expiration: 7 days + expires = datetime.utcnow().timestamp() + (7 * 24 * 60 * 60) + self.expires_at = datetime.fromtimestamp(expires).isoformat() + +class AgentPerformanceMonitor: + """Main performance monitoring system for agents""" + + def __init__(self, user_id: str): + self.user_id = user_id + self.performance_data: Dict[str, List[PerformanceDataPoint]] = defaultdict(list) + self.agent_snapshots: Dict[str, AgentPerformanceSnapshot] = {} + self.recommendations: List[OptimizationRecommendation] = [] + self.performance_history: deque = deque(maxlen=1000) # Keep last 1000 data points + + # Performance thresholds and targets + self.performance_targets = { + "success_rate": 0.85, # 85% success rate target + "response_time": 30.0, # 30 seconds average response time target + "efficiency_score": 0.75, # 75% efficiency score target + "market_impact": 0.60 # 60% market impact score target + } + + # Alert thresholds + self.alert_thresholds = { + "success_rate": 0.70, # Alert if below 70% + "response_time": 60.0, # Alert if above 60 seconds + "efficiency_score": 0.50, # Alert if below 50% + "market_impact": 0.30 # Alert if below 30% + } + + logger.info(f"Initialized AgentPerformanceMonitor for user: {user_id}") + + async def record_performance_data(self, agent_id: str, metric_type: PerformanceMetric, value: float, context: Dict[str, Any] = None) -> bool: + """Record a performance data point""" + try: + if context is None: + context = {} + + data_point = PerformanceDataPoint( + timestamp=datetime.utcnow().isoformat(), + metric_type=metric_type, + value=value, + context=context, + agent_id=agent_id, + user_id=self.user_id + ) + + # Store in performance data + self.performance_data[agent_id].append(data_point) + self.performance_history.append(data_point) + + # Keep only recent data (last 24 hours for real-time analysis) + cutoff_time = datetime.utcnow().timestamp() - (24 * 60 * 60) + self.performance_data[agent_id] = [ + dp for dp in self.performance_data[agent_id] + if datetime.fromisoformat(dp.timestamp).timestamp() > cutoff_time + ] + + logger.debug(f"Recorded performance data for agent {agent_id}: {metric_type.value} = {value}") + return True + + except Exception as e: + logger.error(f"Error recording performance data for agent {agent_id}: {e}") + return False + + async def update_agent_snapshot(self, agent_id: str, status: AgentStatus, action_result: Dict[str, Any] = None) -> AgentPerformanceSnapshot: + """Update performance snapshot for an agent""" + try: + # Get recent performance data + recent_data = self.performance_data[agent_id] + + # Calculate metrics from recent data + total_actions = len([dp for dp in recent_data if dp.metric_type == PerformanceMetric.SUCCESS_RATE]) + successful_actions = len([dp for dp in recent_data if dp.metric_type == PerformanceMetric.SUCCESS_RATE and dp.value > 0.5]) + failed_actions = total_actions - successful_actions + + # Calculate average response time + response_time_data = [dp.value for dp in recent_data if dp.metric_type == PerformanceMetric.RESPONSE_TIME] + avg_response_time = sum(response_time_data) / len(response_time_data) if response_time_data else 0.0 + + # Calculate success rate + success_rate = successful_actions / total_actions if total_actions > 0 else 0.0 + + # Calculate efficiency score + efficiency_data = [dp.value for dp in recent_data if dp.metric_type == PerformanceMetric.EFFICIENCY_SCORE] + avg_efficiency = sum(efficiency_data) / len(efficiency_data) if efficiency_data else 0.0 + + # Calculate market impact + market_impact_data = [dp.value for dp in recent_data if dp.metric_type == PerformanceMetric.MARKET_IMPACT] + avg_market_impact = sum(market_impact_data) / len(market_impact_data) if market_impact_data else 0.0 + + # Get resource usage + resource_usage = self._calculate_resource_usage(agent_id, recent_data) + + # Get last action time + last_action_at = max([dp.timestamp for dp in recent_data], default=datetime.utcnow().isoformat()) if recent_data else datetime.utcnow().isoformat() + + # Create snapshot + snapshot = AgentPerformanceSnapshot( + agent_id=agent_id, + user_id=self.user_id, + timestamp=datetime.utcnow().isoformat(), + status=status, + total_actions=total_actions, + successful_actions=successful_actions, + failed_actions=failed_actions, + average_response_time=avg_response_time, + success_rate=success_rate, + efficiency_score=avg_efficiency, + resource_usage=resource_usage, + market_impact_score=avg_market_impact, + last_action_at=last_action_at + ) + + self.agent_snapshots[agent_id] = snapshot + + logger.info(f"Updated performance snapshot for agent {agent_id}: success_rate={success_rate:.2f}, efficiency={avg_efficiency:.2f}") + return snapshot + + except Exception as e: + logger.error(f"Error updating performance snapshot for agent {agent_id}: {e}") + # Return a default snapshot + return AgentPerformanceSnapshot( + agent_id=agent_id, + user_id=self.user_id, + timestamp=datetime.utcnow().isoformat(), + status=AgentStatus.ERROR, + total_actions=0, + successful_actions=0, + failed_actions=0, + average_response_time=0.0, + success_rate=0.0, + efficiency_score=0.0, + resource_usage={}, + market_impact_score=0.0, + last_action_at=datetime.utcnow().isoformat() + ) + + def _calculate_resource_usage(self, agent_id: str, recent_data: List[PerformanceDataPoint]) -> Dict[str, float]: + """Calculate resource usage metrics""" + resource_usage = { + "cpu_usage": 0.0, + "memory_usage": 0.0, + "api_calls": 0, + "processing_time": 0.0 + } + + try: + # Extract resource usage from context + for dp in recent_data: + if dp.metric_type == PerformanceMetric.RESOURCE_USAGE and dp.context: + resource_usage["cpu_usage"] = max(resource_usage["cpu_usage"], dp.context.get("cpu_usage", 0.0)) + resource_usage["memory_usage"] = max(resource_usage["memory_usage"], dp.context.get("memory_usage", 0.0)) + resource_usage["api_calls"] += dp.context.get("api_calls", 0) + resource_usage["processing_time"] += dp.context.get("processing_time", 0.0) + + # Calculate averages if multiple data points + if len(recent_data) > 0: + resource_usage["processing_time"] = resource_usage["processing_time"] / len(recent_data) + + except Exception as e: + logger.error(f"Error calculating resource usage for agent {agent_id}: {e}") + + return resource_usage + + async def analyze_performance_trends(self, agent_id: str, period_hours: int = 24) -> List[PerformanceTrend]: + """Analyze performance trends for an agent""" + try: + cutoff_time = datetime.utcnow().timestamp() - (period_hours * 60 * 60) + agent_data = [ + dp for dp in self.performance_data[agent_id] + if datetime.fromisoformat(dp.timestamp).timestamp() > cutoff_time + ] + + if len(agent_data) < 5: # Need at least 5 data points for trend analysis + return [] + + trends = [] + + # Analyze trends for each metric type + for metric_type in PerformanceMetric: + metric_data = [dp for dp in agent_data if dp.metric_type == metric_type] + + if len(metric_data) < 3: # Need at least 3 points for trend + continue + + # Sort by timestamp + metric_data.sort(key=lambda x: x.timestamp) + + # Calculate trend + trend_result = self._calculate_trend(metric_data) + + if trend_result: + trend = PerformanceTrend( + metric_type=metric_type, + trend_direction=trend_result["direction"], + trend_strength=trend_result["strength"], + change_rate=trend_result["change_rate"], + confidence=trend_result["confidence"], + period_start=metric_data[0].timestamp, + period_end=metric_data[-1].timestamp + ) + trends.append(trend) + + logger.info(f"Analyzed performance trends for agent {agent_id}: found {len(trends)} trends") + return trends + + except Exception as e: + logger.error(f"Error analyzing performance trends for agent {agent_id}: {e}") + return [] + + def _calculate_trend(self, data_points: List[PerformanceDataPoint]) -> Optional[Dict[str, Any]]: + """Calculate trend from performance data points""" + try: + if len(data_points) < 3: + return None + + # Extract values and timestamps + values = [dp.value for dp in data_points] + timestamps = [datetime.fromisoformat(dp.timestamp).timestamp() for dp in data_points] + + # Simple linear trend calculation + n = len(values) + sum_x = sum(timestamps) + sum_y = sum(values) + sum_xy = sum(x * y for x, y in zip(timestamps, values)) + sum_x2 = sum(x * x for x in timestamps) + + # Calculate slope and intercept + slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x) + intercept = (sum_y - slope * sum_x) / n + + # Calculate correlation coefficient (confidence) + mean_y = sum_y / n + ss_tot = sum((y - mean_y) ** 2 for y in values) + ss_res = sum((y - (slope * x + intercept)) ** 2 for x, y in zip(timestamps, values)) + r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0 + + # Determine trend direction + if abs(slope) < 0.001: # Nearly flat + direction = "stable" + strength = 0.0 + elif slope > 0: + direction = "improving" + strength = min(1.0, abs(slope) * 100) # Scale slope to 0-1 + else: + direction = "declining" + strength = min(1.0, abs(slope) * 100) + + # Calculate change rate (percentage change per hour) + time_span = timestamps[-1] - timestamps[0] + if time_span > 0: + change_rate = (slope * 3600) / (values[0] if values[0] != 0 else 1) * 100 # Per hour + else: + change_rate = 0.0 + + return { + "direction": direction, + "strength": strength, + "change_rate": change_rate, + "confidence": r_squared + } + + except Exception as e: + logger.error(f"Error calculating trend: {e}") + return None + + async def generate_optimization_recommendations(self, agent_id: str) -> List[OptimizationRecommendation]: + """Generate optimization recommendations for an agent""" + try: + recommendations = [] + + # Get current snapshot + snapshot = self.agent_snapshots.get(agent_id) + if not snapshot: + return [] + + # Get performance trends + trends = await self.analyze_performance_trends(agent_id) + + # Generate recommendations based on performance analysis + + # 1. Success rate recommendations + if snapshot.success_rate < self.performance_targets["success_rate"]: + recommendation = OptimizationRecommendation( + recommendation_id=f"success_rate_{agent_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + agent_id=agent_id, + user_id=self.user_id, + recommendation_type="success_rate_improvement", + priority="high" if snapshot.success_rate < self.alert_thresholds["success_rate"] else "medium", + description=f"Agent success rate is {snapshot.success_rate:.1%}, target is {self.performance_targets['success_rate']:.1%}", + expected_impact=self.performance_targets["success_rate"] - snapshot.success_rate, + implementation_steps=[ + "Analyze recent failed actions to identify patterns", + "Review error logs for common failure causes", + "Update agent parameters or logic to address identified issues", + "Test improvements with small batch of actions", + "Monitor success rate improvement over time" + ], + estimated_effort="medium" + ) + recommendations.append(recommendation) + + # 2. Response time recommendations + if snapshot.average_response_time > self.performance_targets["response_time"]: + recommendation = OptimizationRecommendation( + recommendation_id=f"response_time_{agent_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + agent_id=agent_id, + user_id=self.user_id, + recommendation_type="response_time_optimization", + priority="high" if snapshot.average_response_time > self.alert_thresholds["response_time"] else "medium", + description=f"Agent average response time is {snapshot.average_response_time:.1f}s, target is {self.performance_targets['response_time']:.1f}s", + expected_impact=(self.performance_targets["response_time"] - snapshot.average_response_time) / snapshot.average_response_time, + implementation_steps=[ + "Profile agent execution to identify bottlenecks", + "Optimize API calls and external service interactions", + "Implement caching for frequently accessed data", + "Review and optimize agent logic and decision-making", + "Monitor response time improvement" + ], + estimated_effort="high" + ) + recommendations.append(recommendation) + + # 3. Efficiency score recommendations + if snapshot.efficiency_score < self.performance_targets["efficiency_score"]: + recommendation = OptimizationRecommendation( + recommendation_id=f"efficiency_{agent_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + agent_id=agent_id, + user_id=self.user_id, + recommendation_type="efficiency_improvement", + priority="high" if snapshot.efficiency_score < self.alert_thresholds["efficiency_score"] else "medium", + description=f"Agent efficiency score is {snapshot.efficiency_score:.2f}, target is {self.performance_targets['efficiency_score']:.2f}", + expected_impact=self.performance_targets["efficiency_score"] - snapshot.efficiency_score, + implementation_steps=[ + "Analyze agent decision-making patterns", + "Identify redundant or unnecessary operations", + "Optimize agent parameters and thresholds", + "Implement better error handling and recovery", + "Monitor efficiency score improvement" + ], + estimated_effort="medium" + ) + recommendations.append(recommendation) + + # 4. Market impact recommendations + if snapshot.market_impact_score < self.performance_targets["market_impact"]: + recommendation = OptimizationRecommendation( + recommendation_id=f"market_impact_{agent_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + agent_id=agent_id, + user_id=self.user_id, + recommendation_type="market_impact_enhancement", + priority="medium", + description=f"Agent market impact score is {snapshot.market_impact_score:.2f}, target is {self.performance_targets['market_impact']:.2f}", + expected_impact=self.performance_targets["market_impact"] - snapshot.market_impact_score, + implementation_steps=[ + "Analyze market signal detection accuracy", + "Improve market trend analysis and prediction", + "Enhance competitive intelligence gathering", + "Optimize timing and execution of market actions", + "Monitor market impact score improvement" + ], + estimated_effort="high" + ) + recommendations.append(recommendation) + + # 5. Trend-based recommendations + for trend in trends: + if trend.trend_strength > 0.7 and trend.confidence > 0.8: # Strong trend with high confidence + if trend.trend_direction == "declining": + recommendation = OptimizationRecommendation( + recommendation_id=f"trend_{trend.metric_type.value}_{agent_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + agent_id=agent_id, + user_id=self.user_id, + recommendation_type="trend_reversal", + priority="high" if trend.trend_strength > 0.8 else "medium", + description=f"Strong declining trend detected in {trend.metric_type.value}: {trend.change_rate:.1f}% change per hour", + expected_impact=0.3, # Estimate 30% improvement potential + implementation_steps=[ + f"Investigate causes of declining {trend.metric_type.value}", + "Identify specific factors contributing to the trend", + "Implement corrective measures based on findings", + "Monitor trend reversal over time", + "Adjust approach if trend continues" + ], + estimated_effort="medium" + ) + recommendations.append(recommendation) + + # Sort by priority and expected impact + recommendations.sort(key=lambda x: (self._priority_weight(x.priority), x.expected_impact), reverse=True) + + # Keep only top 10 recommendations + recommendations = recommendations[:10] + + # Store recommendations + self.recommendations.extend(recommendations) + + # Keep only recent recommendations (last 50) + if len(self.recommendations) > 50: + self.recommendations = self.recommendations[-50:] + + logger.info(f"Generated {len(recommendations)} optimization recommendations for agent {agent_id}") + return recommendations + + except Exception as e: + logger.error(f"Error generating optimization recommendations for agent {agent_id}: {e}") + return [] + + def _priority_weight(self, priority: str) -> int: + """Convert priority to numeric weight for sorting""" + priority_weights = { + "high": 3, + "medium": 2, + "low": 1 + } + return priority_weights.get(priority, 0) + + async def get_performance_alerts(self, agent_id: str) -> List[Dict[str, Any]]: + """Get performance alerts for an agent""" + alerts = [] + + try: + snapshot = self.agent_snapshots.get(agent_id) + if not snapshot: + return [] + + # Check success rate alert + if snapshot.success_rate < self.alert_thresholds["success_rate"]: + alerts.append({ + "type": "performance_alert", + "metric": "success_rate", + "current_value": snapshot.success_rate, + "threshold": self.alert_thresholds["success_rate"], + "target": self.performance_targets["success_rate"], + "severity": "high" if snapshot.success_rate < 0.5 else "medium", + "message": f"Agent success rate ({snapshot.success_rate:.1%}) is below alert threshold ({self.alert_thresholds['success_rate']:.1%})", + "timestamp": datetime.utcnow().isoformat() + }) + + # Check response time alert + if snapshot.average_response_time > self.alert_thresholds["response_time"]: + alerts.append({ + "type": "performance_alert", + "metric": "response_time", + "current_value": snapshot.average_response_time, + "threshold": self.alert_thresholds["response_time"], + "target": self.performance_targets["response_time"], + "severity": "high" if snapshot.average_response_time > 120 else "medium", + "message": f"Agent response time ({snapshot.average_response_time:.1f}s) exceeds alert threshold ({self.alert_thresholds['response_time']:.1f}s)", + "timestamp": datetime.utcnow().isoformat() + }) + + # Check efficiency score alert + if snapshot.efficiency_score < self.alert_thresholds["efficiency_score"]: + alerts.append({ + "type": "performance_alert", + "metric": "efficiency_score", + "current_value": snapshot.efficiency_score, + "threshold": self.alert_thresholds["efficiency_score"], + "target": self.performance_targets["efficiency_score"], + "severity": "high" if snapshot.efficiency_score < 0.3 else "medium", + "message": f"Agent efficiency score ({snapshot.efficiency_score:.2f}) is below alert threshold ({self.alert_thresholds['efficiency_score']:.2f})", + "timestamp": datetime.utcnow().isoformat() + }) + + # Check market impact alert + if snapshot.market_impact_score < self.alert_thresholds["market_impact"]: + alerts.append({ + "type": "performance_alert", + "metric": "market_impact", + "current_value": snapshot.market_impact_score, + "threshold": self.alert_thresholds["market_impact"], + "target": self.performance_targets["market_impact"], + "severity": "medium", + "message": f"Agent market impact score ({snapshot.market_impact_score:.2f}) is below alert threshold ({self.alert_thresholds['market_impact']:.2f})", + "timestamp": datetime.utcnow().isoformat() + }) + + return alerts + + except Exception as e: + logger.error(f"Error getting performance alerts for agent {agent_id}: {e}") + return [] + + async def get_performance_summary(self, agent_id: str) -> Dict[str, Any]: + """Get comprehensive performance summary for an agent""" + try: + snapshot = self.agent_snapshots.get(agent_id) + if not snapshot: + return {} + + # Get trends + trends = await self.analyze_performance_trends(agent_id) + + # Get recommendations + recommendations = await self.generate_optimization_recommendations(agent_id) + + # Get alerts + alerts = await self.get_performance_alerts(agent_id) + + # Calculate overall health score + health_score = self._calculate_health_score(snapshot) + + return { + "agent_id": agent_id, + "user_id": self.user_id, + "timestamp": datetime.utcnow().isoformat(), + "overall_health": health_score, + "current_performance": asdict(snapshot), + "performance_trends": [asdict(trend) for trend in trends], + "optimization_recommendations": [asdict(rec) for rec in recommendations], + "performance_alerts": alerts, + "performance_targets": self.performance_targets, + "alert_thresholds": self.alert_thresholds + } + + except Exception as e: + logger.error(f"Error getting performance summary for agent {agent_id}: {e}") + return {} + + def _calculate_health_score(self, snapshot: AgentPerformanceSnapshot) -> float: + """Calculate overall health score based on key metrics""" + try: + # Weighted scoring based on key metrics + weights = { + "success_rate": 0.3, + "response_time": 0.25, + "efficiency_score": 0.25, + "market_impact": 0.2 + } + + scores = { + "success_rate": min(1.0, snapshot.success_rate / self.performance_targets["success_rate"]), + "response_time": max(0.0, 1.0 - (snapshot.average_response_time / self.performance_targets["response_time"])), + "efficiency_score": min(1.0, snapshot.efficiency_score / self.performance_targets["efficiency_score"]), + "market_impact": min(1.0, snapshot.market_impact_score / self.performance_targets["market_impact"]) + } + + # Calculate weighted health score + health_score = sum(scores[metric] * weights[metric] for metric in weights.keys()) + + return round(health_score, 2) + + except Exception as e: + logger.error(f"Error calculating health score: {e}") + return 0.0 + + def get_all_agents_performance(self) -> List[Dict[str, Any]]: + """Get performance summary for all agents""" + all_performance = [] + + for agent_id, snapshot in self.agent_snapshots.items(): + performance_summary = { + "agent_id": agent_id, + "user_id": self.user_id, + "status": snapshot.status.value, + "success_rate": snapshot.success_rate, + "efficiency_score": snapshot.efficiency_score, + "response_time": snapshot.average_response_time, + "market_impact": snapshot.market_impact_score, + "total_actions": snapshot.total_actions, + "last_action": snapshot.last_action_at, + "health_score": self._calculate_health_score(snapshot) + } + all_performance.append(performance_summary) + + return all_performance + +# Service class for performance monitoring +class AgentPerformanceService: + """Service class for agent performance monitoring operations""" def __init__(self): - self.metrics_buffer = deque(maxlen=1000) - self.performance_history = defaultdict(list) - self.alert_thresholds = { - PerformanceMetric.SUCCESS_RATE: 0.8, # Alert if success rate < 80% - PerformanceMetric.RESPONSE_TIME: 30.0, # Alert if response time > 30s - PerformanceMetric.GOAL_COMPLETION_RATE: 0.7 # Alert if completion < 70% - } + self.monitors: Dict[str, AgentPerformanceMonitor] = {} + self.global_performance_history: deque = deque(maxlen=5000) # Global history - async def record_metric(self, - agent_id: str, - metric_type: PerformanceMetric, - value: float, - context: Optional[Dict[str, Any]] = None): - """Record a performance metric for an agent""" - metric_entry = AgentPerformanceMetrics( - agent_id=agent_id, - timestamp=datetime.utcnow(), - metrics={metric_type.value: value}, - context=context or {} - ) + async def get_monitor(self, user_id: str) -> AgentPerformanceMonitor: + """Get or create a performance monitor for a user""" + if user_id not in self.monitors: + self.monitors[user_id] = AgentPerformanceMonitor(user_id) + return self.monitors[user_id] + + async def record_agent_performance(self, user_id: str, agent_id: str, metric_type: PerformanceMetric, value: float, context: Dict[str, Any] = None) -> bool: + """Record performance data for an agent""" + monitor = await self.get_monitor(user_id) + success = await monitor.record_performance_data(agent_id, metric_type, value, context) - self.metrics_buffer.append(metric_entry) - self.performance_history[agent_id].append(metric_entry) - - # Check thresholds - await self._check_thresholds(agent_id, metric_type, value) - - # Persist if needed (batching implemented in production) - # await self._persist_metric(metric_entry) - - async def get_agent_performance(self, agent_id: str, time_window_minutes: int = 60) -> Dict[str, Any]: - """Get aggregated performance metrics for an agent""" - cutoff_time = datetime.utcnow() - timedelta(minutes=time_window_minutes) - relevant_metrics = [ - m for m in self.performance_history[agent_id] - if m.timestamp > cutoff_time - ] - - if not relevant_metrics: - return {} - - aggregated = defaultdict(list) - for m in relevant_metrics: - for k, v in m.metrics.items(): - aggregated[k].append(v) - - result = { - "agent_id": agent_id, - "period_minutes": time_window_minutes, - "sample_size": len(relevant_metrics), - "metrics": { - k: sum(v) / len(v) for k, v in aggregated.items() - } - } - - return result - - async def _check_thresholds(self, agent_id: str, metric_type: PerformanceMetric, value: float): - """Check if metric violates thresholds""" - threshold = self.alert_thresholds.get(metric_type) - if not threshold: - return - - is_violation = False - if metric_type in [PerformanceMetric.SUCCESS_RATE, PerformanceMetric.GOAL_COMPLETION_RATE]: - if value < threshold: - is_violation = True - elif value > threshold: - is_violation = True - - if is_violation: - logger.warning( - f"Performance alert for agent {agent_id}: " - f"{metric_type.value} = {value} (Threshold: {threshold})" + if success: + # Also record in global history + data_point = PerformanceDataPoint( + timestamp=datetime.utcnow().isoformat(), + metric_type=metric_type, + value=value, + context=context or {}, + agent_id=agent_id, + user_id=user_id ) - # Trigger alert notification (impl via notification service) + self.global_performance_history.append(data_point) + + return success + + async def update_agent_performance_snapshot(self, user_id: str, agent_id: str, status: AgentStatus, action_result: Dict[str, Any] = None) -> AgentPerformanceSnapshot: + """Update performance snapshot for an agent""" + monitor = await self.get_monitor(user_id) + return await monitor.update_agent_snapshot(agent_id, status, action_result) + + async def get_agent_performance_summary(self, user_id: str, agent_id: str) -> Dict[str, Any]: + """Get comprehensive performance summary for an agent""" + monitor = await self.get_monitor(user_id) + return await monitor.get_performance_summary(agent_id) + + async def get_all_agents_performance_summary(self, user_id: str) -> List[Dict[str, Any]]: + """Get performance summary for all agents for a user""" + monitor = await self.get_monitor(user_id) + return monitor.get_all_agents_performance() + + async def get_global_performance_stats(self) -> Dict[str, Any]: + """Get global performance statistics across all users and agents""" + if not self.global_performance_history: + return {} + + # Calculate global statistics + total_actions = len([dp for dp in self.global_performance_history if dp.metric_type == PerformanceMetric.SUCCESS_RATE]) + successful_actions = len([dp for dp in self.global_performance_history if dp.metric_type == PerformanceMetric.SUCCESS_RATE and dp.value > 0.5]) + + response_times = [dp.value for dp in self.global_performance_history if dp.metric_type == PerformanceMetric.RESPONSE_TIME] + avg_response_time = sum(response_times) / len(response_times) if response_times else 0.0 + + efficiency_scores = [dp.value for dp in self.global_performance_history if dp.metric_type == PerformanceMetric.EFFICIENCY_SCORE] + avg_efficiency = sum(efficiency_scores) / len(efficiency_scores) if efficiency_scores else 0.0 + + unique_users = len(set(dp.user_id for dp in self.global_performance_history)) + unique_agents = len(set(dp.agent_id for dp in self.global_performance_history)) + + return { + "total_actions": total_actions, + "successful_actions": successful_actions, + "overall_success_rate": successful_actions / total_actions if total_actions > 0 else 0.0, + "average_response_time": avg_response_time, + "average_efficiency_score": avg_efficiency, + "unique_users": unique_users, + "unique_agents": unique_agents, + "total_data_points": len(self.global_performance_history), + "timestamp": datetime.utcnow().isoformat() + } -# Singleton instance -performance_monitor = PerformanceMonitor() -AgentPerformanceMonitor = PerformanceMonitor -performance_service = performance_monitor +# Global service instance +performance_service = AgentPerformanceService() + +# Convenience functions for external use +async def record_agent_performance(user_id: str, agent_id: str, metric_type: PerformanceMetric, value: float, context: Dict[str, Any] = None) -> bool: + """Record performance data for an agent""" + return await performance_service.record_agent_performance(user_id, agent_id, metric_type, value, context) + +async def update_agent_performance_snapshot(user_id: str, agent_id: str, status: AgentStatus, action_result: Dict[str, Any] = None) -> AgentPerformanceSnapshot: + """Update performance snapshot for an agent""" + return await performance_service.update_agent_performance_snapshot(user_id, agent_id, status, action_result) + +async def get_agent_performance_summary(user_id: str, agent_id: str) -> Dict[str, Any]: + """Get comprehensive performance summary for an agent""" + return await performance_service.get_agent_performance_summary(user_id, agent_id) + +async def get_all_agents_performance_summary(user_id: str) -> List[Dict[str, Any]]: + """Get performance summary for all agents for a user""" + return await performance_service.get_all_agents_performance_summary(user_id) \ No newline at end of file diff --git a/frontend/src/components/SEODashboard/components/SemanticDashboard.tsx b/frontend/src/components/SEODashboard/components/SemanticDashboard.tsx new file mode 100644 index 00000000..0485726c --- /dev/null +++ b/frontend/src/components/SEODashboard/components/SemanticDashboard.tsx @@ -0,0 +1,470 @@ +import React, { useEffect, useState } from 'react'; +import { + Box, + Card, + CardContent, + Typography, + Grid, + LinearProgress, + Chip, + IconButton, + Tooltip, + Alert, + Skeleton, + Button +} from '@mui/material'; +import { + TrendingUp as TrendingUpIcon, + TrendingDown as TrendingDownIcon, + Warning as WarningIcon, + CheckCircle as CheckCircleIcon, + Refresh as RefreshIcon, + Info as InfoIcon, + Speed as SpeedIcon, + Assessment as AssessmentIcon, + Group as GroupIcon, + Lightbulb as LightbulbIcon +} from '@mui/icons-material'; +import { motion, AnimatePresence } from 'framer-motion'; +import { GlassCard, ShimmerHeader } from '../../shared/styled'; +import { useSemanticDashboardStore } from '../../../stores/semanticDashboardStore'; +import { SemanticHealthMetric, CompetitorSemanticSnapshot, ContentSemanticInsight } from '../../../api/semanticDashboard'; + +interface SemanticDashboardProps { + className?: string; +} + +const SemanticDashboard: React.FC = ({ className }) => { + const { + semanticHealth, + competitorSnapshots, + contentInsights, + loading, + error, + lastUpdated, + monitoringStatus, + fetchSemanticHealth, + fetchCompetitorSnapshots, + fetchContentInsights, + fetchAllSemanticData, + refreshSemanticAnalysis, + getHealthStatusColor, + getInsightTypeColor, + getConfidenceColor + } = useSemanticDashboardStore(); + + const [refreshing, setRefreshing] = useState(false); + + // Fetch semantic data on component mount + useEffect(() => { + fetchAllSemanticData(); + }, []); + + // Auto-refresh every 24 hours (86400000ms) + useEffect(() => { + const interval = setInterval(() => { + if (monitoringStatus === 'active') { + fetchAllSemanticData(); + } + }, 86400000); // 24 hours + + return () => clearInterval(interval); + }, [monitoringStatus, fetchAllSemanticData]); + + const handleRefresh = async () => { + setRefreshing(true); + try { + await refreshSemanticAnalysis(); + } catch (error) { + console.error('Failed to refresh semantic analysis:', error); + } finally { + setRefreshing(false); + } + }; + + const getStatusIcon = (status: string) => { + switch (status) { + case 'healthy': + return ; + case 'warning': + return ; + case 'critical': + return ; + default: + return ; + } + }; + + const getTrendIcon = (trend: string) => { + switch (trend) { + case 'up': + return ; + case 'down': + return ; + default: + return ; + } + }; + + const formatLastUpdated = (timestamp: string | null) => { + if (!timestamp) return 'Never'; + const date = new Date(timestamp); + const now = new Date(); + const diffMs = now.getTime() - date.getTime(); + const diffHours = Math.floor(diffMs / (1000 * 60 * 60)); + + if (diffHours < 1) return 'Just now'; + if (diffHours < 24) return `${diffHours}h ago`; + return `${Math.floor(diffHours / 24)}d ago`; + }; + + if (error) { + return ( + + + + {error} + + + + + ); + } + + return ( + + {/* Header */} + + + + + Semantic Intelligence + + + + + + Updated: {formatLastUpdated(lastUpdated)} + + + + + + + + + + {/* Semantic Health Card */} + + + + + + + Semantic Health + + + + {loading && !semanticHealth ? ( + + + + + + ) : semanticHealth ? ( + + + {getStatusIcon(semanticHealth.status)} + + + {semanticHealth.metric_name.replace(/_/g, ' ').toUpperCase()} + + + {semanticHealth.description} + + + + + {Math.round(semanticHealth.value * 100)}% + + + + + + {semanticHealth.recommendations.length > 0 && ( + + + Recommendations: + + {semanticHealth.recommendations.map((rec, index) => ( + + • {rec} + + ))} + + )} + + ) : ( + + No semantic health data available + + )} + + + + {/* Competitor Semantic Analysis */} + + + + + + + Competitor Semantic Positioning + + + + {loading && competitorSnapshots.length === 0 ? ( + + {[1, 2, 3].map((i) => ( + + + + ))} + + ) : competitorSnapshots.length > 0 ? ( + + + {competitorSnapshots.map((competitor, index) => ( + + + + + + {competitor.competitor_name} + + + + + Semantic Overlap + + + {Math.round(competitor.semantic_overlap * 100)}% + + + + + + Authority Score + + + {Math.round(competitor.authority_score * 100)} + + + + + + Content Volume + + + {competitor.content_volume} pages + + + + {competitor.trending_topics.length > 0 && ( + + + Trending Topics: + + + {competitor.trending_topics.slice(0, 3).map((topic, idx) => ( + + ))} + + + )} + + + + + ))} + + + ) : ( + + No competitor semantic data available + + )} + + + + {/* Content Insights */} + + + + + + + Content Intelligence Insights + + + + {loading && contentInsights.length === 0 ? ( + + {[1, 2, 3].map((i) => ( + + + + ))} + + ) : contentInsights.length > 0 ? ( + + {contentInsights.map((insight, index) => ( + + + + + + + + {insight.title} + + + + + + + + + + + + + {insight.description} + + + + + Impact Score: {Math.round(insight.impact_score * 100)}/100 + + + Expires: {new Date(insight.expires_at).toLocaleDateString()} + + + + {insight.suggested_actions.length > 0 && ( + + + Suggested Actions: + + {insight.suggested_actions.map((action, idx) => ( + + • {action} + + ))} + + )} + + + + ))} + + ) : ( + + No content insights available + + )} + + + + ); +}; + +export default SemanticDashboard;