Add velocity-based safety escalation and lockout flow
This commit is contained in:
@@ -99,6 +99,58 @@ class OptimizationRecommendation:
|
||||
expires = datetime.utcnow().timestamp() + (7 * 24 * 60 * 60)
|
||||
self.expires_at = datetime.fromtimestamp(expires).isoformat()
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class EscalationVelocitySignal:
|
||||
"""Measured action velocity signal used for escalation tiering."""
|
||||
window_minutes: int
|
||||
action_count: int
|
||||
actions_per_minute: float
|
||||
triggered: bool
|
||||
|
||||
|
||||
class EscalationTier(Enum):
|
||||
"""Escalation tier derived from measurable action velocity."""
|
||||
TIER_1 = "tier_1"
|
||||
TIER_2 = "tier_2"
|
||||
TIER_3 = "tier_3"
|
||||
|
||||
|
||||
class EscalationVelocityPolicy:
|
||||
"""Velocity-based trigger policy for escalation tiers."""
|
||||
|
||||
def __init__(self):
|
||||
self.tier_thresholds = {
|
||||
EscalationTier.TIER_1: {"window_minutes": 15, "actions_per_minute": 0.8},
|
||||
EscalationTier.TIER_2: {"window_minutes": 10, "actions_per_minute": 1.5},
|
||||
EscalationTier.TIER_3: {"window_minutes": 5, "actions_per_minute": 3.0},
|
||||
}
|
||||
|
||||
def measure_velocity(self, events: List[Dict[str, Any]], now: Optional[datetime] = None) -> Dict[EscalationTier, EscalationVelocitySignal]:
|
||||
now = now or datetime.utcnow()
|
||||
signals: Dict[EscalationTier, EscalationVelocitySignal] = {}
|
||||
|
||||
for tier, cfg in self.tier_thresholds.items():
|
||||
cutoff = now - timedelta(minutes=cfg["window_minutes"])
|
||||
count = sum(1 for event in events if datetime.fromisoformat(event["timestamp"]) >= cutoff)
|
||||
velocity = count / max(cfg["window_minutes"], 1)
|
||||
signals[tier] = EscalationVelocitySignal(
|
||||
window_minutes=cfg["window_minutes"],
|
||||
action_count=count,
|
||||
actions_per_minute=velocity,
|
||||
triggered=velocity >= cfg["actions_per_minute"]
|
||||
)
|
||||
|
||||
return signals
|
||||
|
||||
def determine_tier(self, events: List[Dict[str, Any]], now: Optional[datetime] = None) -> Tuple[Optional[EscalationTier], Dict[EscalationTier, EscalationVelocitySignal]]:
|
||||
signals = self.measure_velocity(events, now=now)
|
||||
for tier in [EscalationTier.TIER_3, EscalationTier.TIER_2, EscalationTier.TIER_1]:
|
||||
if signals[tier].triggered:
|
||||
return tier, signals
|
||||
return None, signals
|
||||
|
||||
class AgentPerformanceMonitor:
|
||||
"""Main performance monitoring system for agents"""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user