diff --git a/backend/services/intelligence/agents/performance_monitor.py b/backend/services/intelligence/agents/performance_monitor.py index 29029829..83125484 100644 --- a/backend/services/intelligence/agents/performance_monitor.py +++ b/backend/services/intelligence/agents/performance_monitor.py @@ -99,6 +99,17 @@ class OptimizationRecommendation: expires = datetime.utcnow().timestamp() + (7 * 24 * 60 * 60) self.expires_at = datetime.fromtimestamp(expires).isoformat() + +@dataclass +class TierPolicyConfig: + """Structured policy for anomaly tiers and remediation controls""" + tier: int + trigger_metrics: List[str] + thresholds: Dict[str, float] + max_iterations: int + lock_criteria: Dict[str, Any] + + class AgentPerformanceMonitor: """Main performance monitoring system for agents""" @@ -108,6 +119,32 @@ class AgentPerformanceMonitor: self.agent_snapshots: Dict[str, AgentPerformanceSnapshot] = {} self.recommendations: List[OptimizationRecommendation] = [] self.performance_history: deque = deque(maxlen=1000) # Keep last 1000 data points + self.systemic_alerts: List[Dict[str, Any]] = [] + + # Structured tier policy config + self.tier_policy_config: Dict[int, TierPolicyConfig] = { + 1: TierPolicyConfig( + tier=1, + trigger_metrics=["success_rate", "efficiency_score", "response_time"], + thresholds={"success_rate": 0.80, "efficiency_score": 0.65, "response_time": 45.0}, + max_iterations=3, + lock_criteria={"min_confidence": 0.85, "consecutive_failures": 6} + ), + 2: TierPolicyConfig( + tier=2, + trigger_metrics=["success_rate", "efficiency_score", "response_time", "market_impact"], + thresholds={"success_rate": 0.70, "efficiency_score": 0.50, "response_time": 60.0, "market_impact": 0.35}, + max_iterations=2, + lock_criteria={"min_confidence": 0.75, "consecutive_failures": 4} + ), + 3: TierPolicyConfig( + tier=3, + trigger_metrics=["success_rate", "efficiency_score", "response_time", "market_impact"], + thresholds={"success_rate": 0.55, "efficiency_score": 0.35, "response_time": 90.0, "market_impact": 0.25}, + max_iterations=1, + lock_criteria={"min_confidence": 0.65, "consecutive_failures": 3} + ) + } # Performance thresholds and targets self.performance_targets = { @@ -513,6 +550,54 @@ class AgentPerformanceMonitor: } return priority_weights.get(priority, 0) + def _build_recommended_action_payload(self, agent_id: str, snapshot: AgentPerformanceSnapshot) -> Dict[str, Any]: + """Build recommended action payload including tier and confidence.""" + tier = 1 + if (snapshot.success_rate <= self.tier_policy_config[3].thresholds["success_rate"] or + snapshot.efficiency_score <= self.tier_policy_config[3].thresholds["efficiency_score"] or + snapshot.average_response_time >= self.tier_policy_config[3].thresholds["response_time"] or + snapshot.market_impact_score <= self.tier_policy_config[3].thresholds["market_impact"]): + tier = 3 + elif (snapshot.success_rate <= self.tier_policy_config[2].thresholds["success_rate"] or + snapshot.efficiency_score <= self.tier_policy_config[2].thresholds["efficiency_score"] or + snapshot.average_response_time >= self.tier_policy_config[2].thresholds["response_time"] or + snapshot.market_impact_score <= self.tier_policy_config[2].thresholds["market_impact"]): + tier = 2 + + confidence = round(max(0.0, min(1.0, 1.0 - abs(0.75 - self._calculate_health_score(snapshot)))) , 2) + policy = self.tier_policy_config[tier] + + return { + "agent_id": agent_id, + "tier": tier, + "confidence": confidence, + "max_iterations": policy.max_iterations, + "lock_criteria": policy.lock_criteria, + "trigger_metrics": policy.trigger_metrics + } + + def _route_tier3_systemic_alert(self, action_payload: Dict[str, Any], alerts: List[Dict[str, Any]]) -> None: + """Route Tier 3 systemic anomalies to alerting subsystem with diagnostic brief.""" + diagnostic_brief = { + "type": "systemic_anomaly", + "severity": "critical", + "tier": 3, + "confidence": action_payload.get("confidence", 0.0), + "agent_id": action_payload.get("agent_id"), + "timestamp": datetime.utcnow().isoformat(), + "diagnostic_brief": { + "trigger_metrics": action_payload.get("trigger_metrics", []), + "alerts": alerts, + "max_iterations": action_payload.get("max_iterations"), + "lock_criteria": action_payload.get("lock_criteria", {}) + } + } + self.systemic_alerts.append(diagnostic_brief) + if len(self.systemic_alerts) > 200: + self.systemic_alerts = self.systemic_alerts[-200:] + logger.critical(f"[ALERTING_SUBSYSTEM] Tier 3 systemic anomaly routed: {json.dumps(diagnostic_brief)}") + + async def get_performance_alerts(self, agent_id: str) -> List[Dict[str, Any]]: """Get performance alerts for an agent""" alerts = [] @@ -574,6 +659,13 @@ class AgentPerformanceMonitor: "timestamp": datetime.utcnow().isoformat() }) + action_payload = self._build_recommended_action_payload(agent_id, snapshot) + if action_payload["tier"] == 3: + self._route_tier3_systemic_alert(action_payload, alerts) + + for alert in alerts: + alert["recommended_action"] = action_payload + return alerts except Exception as e: diff --git a/backend/services/intelligence/agents/safety_framework.py b/backend/services/intelligence/agents/safety_framework.py index cc54902e..7c1d5b86 100644 --- a/backend/services/intelligence/agents/safety_framework.py +++ b/backend/services/intelligence/agents/safety_framework.py @@ -84,6 +84,17 @@ class SafetyValidation: if self.validation_timestamp is None: self.validation_timestamp = datetime.utcnow().isoformat() + +@dataclass +class SafetyArbitrationDecision: + """Explicit allow/deny/lock decision with reasons.""" + decision: str + reasons: List[str] + tier: int + confidence: float + lock_state_active: bool + + class SafetyConstraintManager: """Manages safety constraints for agent actions""" @@ -92,6 +103,8 @@ class SafetyConstraintManager: self.constraints: Dict[str, SafetyConstraint] = {} self.action_history: List[Dict[str, Any]] = [] self.violation_history: List[Dict[str, Any]] = [] + self.lock_state_active: bool = False + self.lock_state_reason: Optional[str] = None # Initialize default constraints self._initialize_default_constraints() @@ -163,6 +176,17 @@ class SafetyConstraintManager: """Validate an action against safety constraints""" try: logger.info(f"Validating action for user {self.user_id}: {action_data.get('action_type', 'unknown')}") + + if self.lock_state_active and action_data.get("autonomous_modification", True): + reason = self.lock_state_reason or "Safety lock is active due to Tier 3 systemic anomaly" + return SafetyValidation( + is_valid=False, + risk_level=RiskLevel.CRITICAL, + violations=["Autonomous modifications blocked while lock state is active"], + recommendations=[reason], + requires_approval=True, + confidence_score=1.0 + ) violations = [] recommendations = [] @@ -207,19 +231,29 @@ class SafetyConstraintManager: # Final validation is_valid = len(violations) == 0 and not requires_approval - - logger.info(f"Action validation completed for user {self.user_id}. Valid: {is_valid}, Risk: {risk_level.value}, Violations: {len(violations)}") - + confidence_score = max(0.0, min(1.0, confidence_score)) + arbitration = self._arbitrate_decision(action_data, risk_level, violations, requires_approval, confidence_score) + + if arbitration.decision == "lock": + self.lock_state_active = True + self.lock_state_reason = "; ".join(arbitration.reasons) + is_valid = False + requires_approval = True + + recommendations.extend([f"Arbitration decision: {arbitration.decision}", *arbitration.reasons]) + + logger.info(f"Action validation completed for user {self.user_id}. Decision: {arbitration.decision}, Valid: {is_valid}, Risk: {risk_level.value}, Violations: {len(violations)}") + # Record in history await self._record_validation_history(action_data, is_valid, violations) - + return SafetyValidation( is_valid=is_valid, risk_level=risk_level, violations=violations, recommendations=recommendations, requires_approval=requires_approval, - confidence_score=max(0.0, min(1.0, confidence_score)) + confidence_score=confidence_score ) except Exception as e: @@ -235,6 +269,30 @@ class SafetyConstraintManager: confidence_score=0.0 ) + def _arbitrate_decision(self, action_data: Dict[str, Any], risk_level: RiskLevel, violations: List[str], requires_approval: bool, confidence_score: float) -> SafetyArbitrationDecision: + """Arbitrate allow/deny/lock with explicit reasons.""" + reasons: List[str] = [] + tier = int(action_data.get("recommended_tier", 1)) + + if self.lock_state_active: + reasons.append("Existing lock state is active") + return SafetyArbitrationDecision("lock", reasons, tier, confidence_score, True) + + if tier >= 3 or risk_level == RiskLevel.CRITICAL: + reasons.append("Tier 3 systemic anomaly or critical risk detected") + if violations: + reasons.extend(violations) + return SafetyArbitrationDecision("lock", reasons, 3, confidence_score, True) + + if violations or requires_approval: + reasons.append("Safety policy violation or approval requirement triggered") + reasons.extend(violations) + return SafetyArbitrationDecision("deny", reasons, tier, confidence_score, False) + + reasons.append("No policy violations detected") + return SafetyArbitrationDecision("allow", reasons, tier, confidence_score, False) + + def _determine_action_category(self, action_type: str) -> ActionCategory: """Determine the category of an action""" action_type_lower = action_type.lower()