Compare commits
1 Commits
codex/refa
...
codex/upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0674dfa22 |
@@ -99,6 +99,58 @@ class OptimizationRecommendation:
|
||||
expires = datetime.utcnow().timestamp() + (7 * 24 * 60 * 60)
|
||||
self.expires_at = datetime.fromtimestamp(expires).isoformat()
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class EscalationVelocitySignal:
|
||||
"""Measured action velocity signal used for escalation tiering."""
|
||||
window_minutes: int
|
||||
action_count: int
|
||||
actions_per_minute: float
|
||||
triggered: bool
|
||||
|
||||
|
||||
class EscalationTier(Enum):
|
||||
"""Escalation tier derived from measurable action velocity."""
|
||||
TIER_1 = "tier_1"
|
||||
TIER_2 = "tier_2"
|
||||
TIER_3 = "tier_3"
|
||||
|
||||
|
||||
class EscalationVelocityPolicy:
|
||||
"""Velocity-based trigger policy for escalation tiers."""
|
||||
|
||||
def __init__(self):
|
||||
self.tier_thresholds = {
|
||||
EscalationTier.TIER_1: {"window_minutes": 15, "actions_per_minute": 0.8},
|
||||
EscalationTier.TIER_2: {"window_minutes": 10, "actions_per_minute": 1.5},
|
||||
EscalationTier.TIER_3: {"window_minutes": 5, "actions_per_minute": 3.0},
|
||||
}
|
||||
|
||||
def measure_velocity(self, events: List[Dict[str, Any]], now: Optional[datetime] = None) -> Dict[EscalationTier, EscalationVelocitySignal]:
|
||||
now = now or datetime.utcnow()
|
||||
signals: Dict[EscalationTier, EscalationVelocitySignal] = {}
|
||||
|
||||
for tier, cfg in self.tier_thresholds.items():
|
||||
cutoff = now - timedelta(minutes=cfg["window_minutes"])
|
||||
count = sum(1 for event in events if datetime.fromisoformat(event["timestamp"]) >= cutoff)
|
||||
velocity = count / max(cfg["window_minutes"], 1)
|
||||
signals[tier] = EscalationVelocitySignal(
|
||||
window_minutes=cfg["window_minutes"],
|
||||
action_count=count,
|
||||
actions_per_minute=velocity,
|
||||
triggered=velocity >= cfg["actions_per_minute"]
|
||||
)
|
||||
|
||||
return signals
|
||||
|
||||
def determine_tier(self, events: List[Dict[str, Any]], now: Optional[datetime] = None) -> Tuple[Optional[EscalationTier], Dict[EscalationTier, EscalationVelocitySignal]]:
|
||||
signals = self.measure_velocity(events, now=now)
|
||||
for tier in [EscalationTier.TIER_3, EscalationTier.TIER_2, EscalationTier.TIER_1]:
|
||||
if signals[tier].triggered:
|
||||
return tier, signals
|
||||
return None, signals
|
||||
|
||||
class AgentPerformanceMonitor:
|
||||
"""Main performance monitoring system for agents"""
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ from enum import Enum
|
||||
|
||||
from utils.logger_utils import get_service_logger
|
||||
from services.database import get_session_for_user
|
||||
from services.intelligence.agents.performance_monitor import EscalationVelocityPolicy, EscalationTier
|
||||
|
||||
logger = get_service_logger(__name__)
|
||||
|
||||
@@ -84,6 +85,25 @@ class SafetyValidation:
|
||||
if self.validation_timestamp is None:
|
||||
self.validation_timestamp = datetime.utcnow().isoformat()
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class EscalationDecision:
|
||||
"""Structured escalation payload for autonomous safety routing."""
|
||||
tier: str
|
||||
action: str
|
||||
confidence: float
|
||||
risk_class: str
|
||||
rationale: str
|
||||
velocity: Dict[str, Any]
|
||||
lockout_auto_edits: bool
|
||||
executor: Optional[str]
|
||||
created_at: str = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.created_at is None:
|
||||
self.created_at = datetime.utcnow().isoformat()
|
||||
|
||||
class SafetyConstraintManager:
|
||||
"""Manages safety constraints for agent actions"""
|
||||
|
||||
@@ -92,6 +112,11 @@ class SafetyConstraintManager:
|
||||
self.constraints: Dict[str, SafetyConstraint] = {}
|
||||
self.action_history: List[Dict[str, Any]] = []
|
||||
self.violation_history: List[Dict[str, Any]] = []
|
||||
self.escalation_policy = EscalationVelocityPolicy()
|
||||
self.escalation_history: List[Dict[str, Any]] = []
|
||||
self.auto_edit_lockout = False
|
||||
self.executor_routes = {"tier_1": "autonomous_guardian_executor", "tier_2": "autonomous_recovery_executor"}
|
||||
self.alert_history: List[Dict[str, Any]] = []
|
||||
|
||||
# Initialize default constraints
|
||||
self._initialize_default_constraints()
|
||||
@@ -213,7 +238,7 @@ class SafetyConstraintManager:
|
||||
# Record in history
|
||||
await self._record_validation_history(action_data, is_valid, violations)
|
||||
|
||||
return SafetyValidation(
|
||||
validation = SafetyValidation(
|
||||
is_valid=is_valid,
|
||||
risk_level=risk_level,
|
||||
violations=violations,
|
||||
@@ -221,6 +246,10 @@ class SafetyConstraintManager:
|
||||
requires_approval=requires_approval,
|
||||
confidence_score=max(0.0, min(1.0, confidence_score))
|
||||
)
|
||||
escalation = await self.evaluate_escalation(action_data, validation)
|
||||
if escalation:
|
||||
recommendations.append(f"Escalation action: {escalation.action} ({escalation.tier})")
|
||||
return validation
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating action for user {self.user_id}: {e}")
|
||||
@@ -466,6 +495,97 @@ class SafetyConstraintManager:
|
||||
if len(self.violation_history) > 500:
|
||||
self.violation_history = self.violation_history[-500:]
|
||||
|
||||
async def evaluate_escalation(self, action_data: Dict[str, Any], validation: SafetyValidation) -> Optional[EscalationDecision]:
|
||||
"""Evaluate velocity-triggered escalation and produce structured decision payload."""
|
||||
if self.auto_edit_lockout:
|
||||
decision = EscalationDecision(
|
||||
tier=EscalationTier.TIER_3.value,
|
||||
action="lockout_enforced",
|
||||
confidence=1.0,
|
||||
risk_class=RiskLevel.CRITICAL.value,
|
||||
rationale="Tier 3 lockout already active; autonomous edits blocked until manual reset",
|
||||
velocity={},
|
||||
lockout_auto_edits=True,
|
||||
executor=None
|
||||
)
|
||||
await self._persist_escalation_decision(decision, action_data, outcome={"status": "blocked_by_lockout"})
|
||||
return decision
|
||||
|
||||
tier, signals = self.escalation_policy.determine_tier(self.action_history)
|
||||
if not tier:
|
||||
return None
|
||||
|
||||
risk_class_map = {EscalationTier.TIER_1: RiskLevel.MEDIUM.value, EscalationTier.TIER_2: RiskLevel.HIGH.value, EscalationTier.TIER_3: RiskLevel.CRITICAL.value}
|
||||
confidence = min(1.0, max(0.1, 0.55 + (len(validation.violations) * 0.05) + ((1 - validation.confidence_score) * 0.4)))
|
||||
|
||||
velocity_signal = signals[tier]
|
||||
velocity_payload = {
|
||||
"window_minutes": velocity_signal.window_minutes,
|
||||
"action_count": velocity_signal.action_count,
|
||||
"actions_per_minute": round(velocity_signal.actions_per_minute, 4),
|
||||
"threshold_actions_per_minute": self.escalation_policy.tier_thresholds[tier]["actions_per_minute"],
|
||||
}
|
||||
|
||||
executor = self.executor_routes.get(tier.value)
|
||||
action = "route_to_autonomous_executor" if tier in (EscalationTier.TIER_1, EscalationTier.TIER_2) else "lockout_autonomous_edits"
|
||||
rationale = f"{tier.value} triggered by velocity {velocity_payload['actions_per_minute']}/min over {velocity_signal.window_minutes}m window"
|
||||
|
||||
decision = EscalationDecision(
|
||||
tier=tier.value,
|
||||
action=action,
|
||||
confidence=round(confidence, 3),
|
||||
risk_class=risk_class_map[tier],
|
||||
rationale=rationale,
|
||||
velocity=velocity_payload,
|
||||
lockout_auto_edits=(tier == EscalationTier.TIER_3),
|
||||
executor=executor if tier != EscalationTier.TIER_3 else None
|
||||
)
|
||||
|
||||
outcome = await self._apply_escalation_decision(decision, action_data, validation)
|
||||
await self._persist_escalation_decision(decision, action_data, outcome=outcome)
|
||||
return decision
|
||||
|
||||
async def _apply_escalation_decision(self, decision: EscalationDecision, action_data: Dict[str, Any], validation: SafetyValidation) -> Dict[str, Any]:
|
||||
if decision.tier in (EscalationTier.TIER_1.value, EscalationTier.TIER_2.value):
|
||||
return {
|
||||
"status": "routed",
|
||||
"executor": decision.executor,
|
||||
"reason": decision.rationale
|
||||
}
|
||||
|
||||
self.auto_edit_lockout = True
|
||||
brief = {
|
||||
"type": "diagnostic_brief",
|
||||
"severity": "critical",
|
||||
"tier": decision.tier,
|
||||
"user_rationale": "Autonomous edits have been paused to protect account safety after sustained high-velocity actions.",
|
||||
"validation_violations": validation.violations,
|
||||
"action_type": action_data.get("action_type", "unknown"),
|
||||
"timestamp": datetime.utcnow().isoformat()
|
||||
}
|
||||
self.alert_history.append(brief)
|
||||
if len(self.alert_history) > 500:
|
||||
self.alert_history = self.alert_history[-500:]
|
||||
|
||||
return {"status": "lockout_enabled", "diagnostic_brief": brief}
|
||||
|
||||
async def _persist_escalation_decision(self, decision: EscalationDecision, action_data: Dict[str, Any], outcome: Dict[str, Any]):
|
||||
record = {
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"decision": asdict(decision),
|
||||
"action_data": action_data,
|
||||
"outcome": outcome
|
||||
}
|
||||
self.escalation_history.append(record)
|
||||
if len(self.escalation_history) > 2000:
|
||||
self.escalation_history = self.escalation_history[-2000:]
|
||||
|
||||
def get_escalation_history(self, limit: int = 100) -> List[Dict[str, Any]]:
|
||||
return self.escalation_history[-limit:] if self.escalation_history else []
|
||||
|
||||
def reset_auto_edit_lockout(self):
|
||||
self.auto_edit_lockout = False
|
||||
|
||||
def add_custom_constraint(self, constraint: SafetyConstraint):
|
||||
"""Add a custom safety constraint"""
|
||||
self.constraints[constraint.constraint_id] = constraint
|
||||
|
||||
Reference in New Issue
Block a user