Compare commits

..

1 Commits

Author SHA1 Message Date
ي
b0674dfa22 Add velocity-based safety escalation and lockout flow 2026-05-18 15:56:34 +05:30
3 changed files with 173 additions and 272 deletions

View File

@@ -99,6 +99,58 @@ class OptimizationRecommendation:
expires = datetime.utcnow().timestamp() + (7 * 24 * 60 * 60)
self.expires_at = datetime.fromtimestamp(expires).isoformat()
@dataclass
class EscalationVelocitySignal:
"""Measured action velocity signal used for escalation tiering."""
window_minutes: int
action_count: int
actions_per_minute: float
triggered: bool
class EscalationTier(Enum):
"""Escalation tier derived from measurable action velocity."""
TIER_1 = "tier_1"
TIER_2 = "tier_2"
TIER_3 = "tier_3"
class EscalationVelocityPolicy:
"""Velocity-based trigger policy for escalation tiers."""
def __init__(self):
self.tier_thresholds = {
EscalationTier.TIER_1: {"window_minutes": 15, "actions_per_minute": 0.8},
EscalationTier.TIER_2: {"window_minutes": 10, "actions_per_minute": 1.5},
EscalationTier.TIER_3: {"window_minutes": 5, "actions_per_minute": 3.0},
}
def measure_velocity(self, events: List[Dict[str, Any]], now: Optional[datetime] = None) -> Dict[EscalationTier, EscalationVelocitySignal]:
now = now or datetime.utcnow()
signals: Dict[EscalationTier, EscalationVelocitySignal] = {}
for tier, cfg in self.tier_thresholds.items():
cutoff = now - timedelta(minutes=cfg["window_minutes"])
count = sum(1 for event in events if datetime.fromisoformat(event["timestamp"]) >= cutoff)
velocity = count / max(cfg["window_minutes"], 1)
signals[tier] = EscalationVelocitySignal(
window_minutes=cfg["window_minutes"],
action_count=count,
actions_per_minute=velocity,
triggered=velocity >= cfg["actions_per_minute"]
)
return signals
def determine_tier(self, events: List[Dict[str, Any]], now: Optional[datetime] = None) -> Tuple[Optional[EscalationTier], Dict[EscalationTier, EscalationVelocitySignal]]:
signals = self.measure_velocity(events, now=now)
for tier in [EscalationTier.TIER_3, EscalationTier.TIER_2, EscalationTier.TIER_1]:
if signals[tier].triggered:
return tier, signals
return None, signals
class AgentPerformanceMonitor:
"""Main performance monitoring system for agents"""

View File

@@ -13,6 +13,7 @@ from enum import Enum
from utils.logger_utils import get_service_logger
from services.database import get_session_for_user
from services.intelligence.agents.performance_monitor import EscalationVelocityPolicy, EscalationTier
logger = get_service_logger(__name__)
@@ -84,6 +85,25 @@ class SafetyValidation:
if self.validation_timestamp is None:
self.validation_timestamp = datetime.utcnow().isoformat()
@dataclass
class EscalationDecision:
"""Structured escalation payload for autonomous safety routing."""
tier: str
action: str
confidence: float
risk_class: str
rationale: str
velocity: Dict[str, Any]
lockout_auto_edits: bool
executor: Optional[str]
created_at: str = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.utcnow().isoformat()
class SafetyConstraintManager:
"""Manages safety constraints for agent actions"""
@@ -92,6 +112,11 @@ class SafetyConstraintManager:
self.constraints: Dict[str, SafetyConstraint] = {}
self.action_history: List[Dict[str, Any]] = []
self.violation_history: List[Dict[str, Any]] = []
self.escalation_policy = EscalationVelocityPolicy()
self.escalation_history: List[Dict[str, Any]] = []
self.auto_edit_lockout = False
self.executor_routes = {"tier_1": "autonomous_guardian_executor", "tier_2": "autonomous_recovery_executor"}
self.alert_history: List[Dict[str, Any]] = []
# Initialize default constraints
self._initialize_default_constraints()
@@ -213,7 +238,7 @@ class SafetyConstraintManager:
# Record in history
await self._record_validation_history(action_data, is_valid, violations)
return SafetyValidation(
validation = SafetyValidation(
is_valid=is_valid,
risk_level=risk_level,
violations=violations,
@@ -221,6 +246,10 @@ class SafetyConstraintManager:
requires_approval=requires_approval,
confidence_score=max(0.0, min(1.0, confidence_score))
)
escalation = await self.evaluate_escalation(action_data, validation)
if escalation:
recommendations.append(f"Escalation action: {escalation.action} ({escalation.tier})")
return validation
except Exception as e:
logger.error(f"Error validating action for user {self.user_id}: {e}")
@@ -466,6 +495,97 @@ class SafetyConstraintManager:
if len(self.violation_history) > 500:
self.violation_history = self.violation_history[-500:]
async def evaluate_escalation(self, action_data: Dict[str, Any], validation: SafetyValidation) -> Optional[EscalationDecision]:
"""Evaluate velocity-triggered escalation and produce structured decision payload."""
if self.auto_edit_lockout:
decision = EscalationDecision(
tier=EscalationTier.TIER_3.value,
action="lockout_enforced",
confidence=1.0,
risk_class=RiskLevel.CRITICAL.value,
rationale="Tier 3 lockout already active; autonomous edits blocked until manual reset",
velocity={},
lockout_auto_edits=True,
executor=None
)
await self._persist_escalation_decision(decision, action_data, outcome={"status": "blocked_by_lockout"})
return decision
tier, signals = self.escalation_policy.determine_tier(self.action_history)
if not tier:
return None
risk_class_map = {EscalationTier.TIER_1: RiskLevel.MEDIUM.value, EscalationTier.TIER_2: RiskLevel.HIGH.value, EscalationTier.TIER_3: RiskLevel.CRITICAL.value}
confidence = min(1.0, max(0.1, 0.55 + (len(validation.violations) * 0.05) + ((1 - validation.confidence_score) * 0.4)))
velocity_signal = signals[tier]
velocity_payload = {
"window_minutes": velocity_signal.window_minutes,
"action_count": velocity_signal.action_count,
"actions_per_minute": round(velocity_signal.actions_per_minute, 4),
"threshold_actions_per_minute": self.escalation_policy.tier_thresholds[tier]["actions_per_minute"],
}
executor = self.executor_routes.get(tier.value)
action = "route_to_autonomous_executor" if tier in (EscalationTier.TIER_1, EscalationTier.TIER_2) else "lockout_autonomous_edits"
rationale = f"{tier.value} triggered by velocity {velocity_payload['actions_per_minute']}/min over {velocity_signal.window_minutes}m window"
decision = EscalationDecision(
tier=tier.value,
action=action,
confidence=round(confidence, 3),
risk_class=risk_class_map[tier],
rationale=rationale,
velocity=velocity_payload,
lockout_auto_edits=(tier == EscalationTier.TIER_3),
executor=executor if tier != EscalationTier.TIER_3 else None
)
outcome = await self._apply_escalation_decision(decision, action_data, validation)
await self._persist_escalation_decision(decision, action_data, outcome=outcome)
return decision
async def _apply_escalation_decision(self, decision: EscalationDecision, action_data: Dict[str, Any], validation: SafetyValidation) -> Dict[str, Any]:
if decision.tier in (EscalationTier.TIER_1.value, EscalationTier.TIER_2.value):
return {
"status": "routed",
"executor": decision.executor,
"reason": decision.rationale
}
self.auto_edit_lockout = True
brief = {
"type": "diagnostic_brief",
"severity": "critical",
"tier": decision.tier,
"user_rationale": "Autonomous edits have been paused to protect account safety after sustained high-velocity actions.",
"validation_violations": validation.violations,
"action_type": action_data.get("action_type", "unknown"),
"timestamp": datetime.utcnow().isoformat()
}
self.alert_history.append(brief)
if len(self.alert_history) > 500:
self.alert_history = self.alert_history[-500:]
return {"status": "lockout_enabled", "diagnostic_brief": brief}
async def _persist_escalation_decision(self, decision: EscalationDecision, action_data: Dict[str, Any], outcome: Dict[str, Any]):
record = {
"timestamp": datetime.utcnow().isoformat(),
"decision": asdict(decision),
"action_data": action_data,
"outcome": outcome
}
self.escalation_history.append(record)
if len(self.escalation_history) > 2000:
self.escalation_history = self.escalation_history[-2000:]
def get_escalation_history(self, limit: int = 100) -> List[Dict[str, Any]]:
return self.escalation_history[-limit:] if self.escalation_history else []
def reset_auto_edit_lockout(self):
self.auto_edit_lockout = False
def add_custom_constraint(self, constraint: SafetyConstraint):
"""Add a custom safety constraint"""
self.constraints[constraint.constraint_id] = constraint

View File

@@ -1,271 +0,0 @@
"""Self-healing executor for social post engagement recovery.
Implements:
- Per-post evaluation windows and cooldown timers
- Stagnation trigger evaluation with tiered action selection
- Action idempotency keys for edit/comment/thread operations
- Duplicate and over-frequency suppression within cooldown boundaries
- Outcome persistence and safe retry policy for transient failures
"""
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta, timezone
from enum import Enum
import hashlib
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
class ActionType(str, Enum):
EDIT = "edit"
COMMENT = "comment"
THREAD = "thread"
class ActionTier(str, Enum):
TIER_1 = "tier_1" # low-intensity nudge (comment)
TIER_2 = "tier_2" # medium-intensity enhancement (edit)
TIER_3 = "tier_3" # high-intensity amplification (thread)
SAFE_TRANSIENT_ERROR_CODES = {
"timeout",
"rate_limit",
"service_unavailable",
"network_error",
}
@dataclass
class EvaluationConfig:
per_post_window_minutes: int = 90
min_samples_required: int = 3
cooldown_by_action_seconds: Dict[ActionType, int] = field(
default_factory=lambda: {
ActionType.COMMENT: 30 * 60,
ActionType.EDIT: 2 * 60 * 60,
ActionType.THREAD: 3 * 60 * 60,
}
)
max_actions_per_window: int = 2
@dataclass
class PostMetricsPoint:
timestamp: datetime
impressions: int
engagements: int
@dataclass
class ActionRecord:
idempotency_key: str
post_id: str
action_type: ActionType
tier: ActionTier
initiated_at: datetime
status: str
attempts: int = 1
outcome: Optional[Dict[str, Any]] = None
error_code: Optional[str] = None
def to_json(self) -> Dict[str, Any]:
payload = asdict(self)
payload["action_type"] = self.action_type.value
payload["tier"] = self.tier.value
payload["initiated_at"] = self.initiated_at.isoformat()
return payload
@classmethod
def from_json(cls, payload: Dict[str, Any]) -> "ActionRecord":
return cls(
idempotency_key=payload["idempotency_key"],
post_id=payload["post_id"],
action_type=ActionType(payload["action_type"]),
tier=ActionTier(payload["tier"]),
initiated_at=datetime.fromisoformat(payload["initiated_at"]),
status=payload["status"],
attempts=payload.get("attempts", 1),
outcome=payload.get("outcome"),
error_code=payload.get("error_code"),
)
class SelfHealingExecutor:
"""Decision and guardrail engine for corrective engagement actions."""
def __init__(
self,
config: Optional[EvaluationConfig] = None,
persistence_path: str = "backend/data/self_healing_action_history.json",
) -> None:
self.config = config or EvaluationConfig()
self.persistence_path = Path(persistence_path)
self._history: List[ActionRecord] = self._load_history()
def evaluate_and_plan(
self,
post_id: str,
metrics: List[PostMetricsPoint],
now: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Evaluate stagnation for a post and plan a single best next action."""
now = now or datetime.now(timezone.utc)
window_metrics = self._filter_window(metrics, now)
if len(window_metrics) < self.config.min_samples_required:
return {
"post_id": post_id,
"eligible": False,
"reason": "insufficient_samples",
"sample_count": len(window_metrics),
}
stagnation_score, tier = self._evaluate_stagnation(window_metrics)
action_type = self._choose_action_type(tier)
idempotency_key = self.generate_idempotency_key(post_id, action_type, tier)
if self._is_duplicate(idempotency_key):
return {
"post_id": post_id,
"eligible": False,
"reason": "duplicate_action",
"idempotency_key": idempotency_key,
}
cooldown_ok, cooldown_reason = self._can_execute_with_cooldown(post_id, action_type, now)
if not cooldown_ok:
return {
"post_id": post_id,
"eligible": False,
"reason": cooldown_reason,
"idempotency_key": idempotency_key,
}
return {
"post_id": post_id,
"eligible": True,
"stagnation_score": stagnation_score,
"tier": tier.value,
"action_type": action_type.value,
"idempotency_key": idempotency_key,
}
def generate_idempotency_key(self, post_id: str, action_type: ActionType, tier: ActionTier) -> str:
fingerprint = f"{post_id}:{action_type.value}:{tier.value}".encode("utf-8")
digest = hashlib.sha256(fingerprint).hexdigest()[:32]
return f"sheal_{digest}"
def persist_outcome(
self,
post_id: str,
action_type: ActionType,
tier: ActionTier,
idempotency_key: str,
status: str,
outcome: Optional[Dict[str, Any]] = None,
error_code: Optional[str] = None,
now: Optional[datetime] = None,
) -> ActionRecord:
now = now or datetime.now(timezone.utc)
existing = next((h for h in self._history if h.idempotency_key == idempotency_key), None)
if existing:
existing.status = status
existing.outcome = outcome
existing.error_code = error_code
existing.attempts += 1
existing.initiated_at = now
record = existing
else:
record = ActionRecord(
idempotency_key=idempotency_key,
post_id=post_id,
action_type=action_type,
tier=tier,
initiated_at=now,
status=status,
outcome=outcome,
error_code=error_code,
)
self._history.append(record)
self._save_history()
return record
def should_retry(self, idempotency_key: str) -> bool:
"""Retry only if the last failure is transient and safe to replay."""
rec = next((h for h in self._history if h.idempotency_key == idempotency_key), None)
if not rec or rec.status != "failed":
return False
if rec.error_code not in SAFE_TRANSIENT_ERROR_CODES:
return False
return rec.action_type in {ActionType.COMMENT, ActionType.EDIT, ActionType.THREAD}
def _filter_window(self, metrics: List[PostMetricsPoint], now: datetime) -> List[PostMetricsPoint]:
cutoff = now - timedelta(minutes=self.config.per_post_window_minutes)
return [m for m in metrics if m.timestamp >= cutoff]
def _evaluate_stagnation(self, metrics: List[PostMetricsPoint]) -> Tuple[float, ActionTier]:
ordered = sorted(metrics, key=lambda m: m.timestamp)
first, last = ordered[0], ordered[-1]
imp_delta = max(0, last.impressions - first.impressions)
eng_delta = max(0, last.engagements - first.engagements)
eng_rate = eng_delta / imp_delta if imp_delta > 0 else 0.0
stagnation_score = 1.0 - min(1.0, eng_rate * 20)
if stagnation_score >= 0.8:
return stagnation_score, ActionTier.TIER_3
if stagnation_score >= 0.55:
return stagnation_score, ActionTier.TIER_2
return stagnation_score, ActionTier.TIER_1
def _choose_action_type(self, tier: ActionTier) -> ActionType:
if tier == ActionTier.TIER_1:
return ActionType.COMMENT
if tier == ActionTier.TIER_2:
return ActionType.EDIT
return ActionType.THREAD
def _is_duplicate(self, idempotency_key: str) -> bool:
return any(h.idempotency_key == idempotency_key and h.status in {"success", "running"} for h in self._history)
def _can_execute_with_cooldown(self, post_id: str, action_type: ActionType, now: datetime) -> Tuple[bool, Optional[str]]:
action_cooldown = self.config.cooldown_by_action_seconds[action_type]
same_post = [h for h in self._history if h.post_id == post_id]
recent_in_window = [
h for h in same_post
if h.initiated_at >= now - timedelta(minutes=self.config.per_post_window_minutes)
]
if len(recent_in_window) >= self.config.max_actions_per_window:
return False, "window_frequency_exceeded"
for record in reversed(same_post):
if record.action_type != action_type:
continue
if (now - record.initiated_at).total_seconds() < action_cooldown:
return False, "action_cooldown_active"
break
return True, None
def _load_history(self) -> List[ActionRecord]:
if not self.persistence_path.exists():
return []
try:
payload = json.loads(self.persistence_path.read_text(encoding="utf-8"))
return [ActionRecord.from_json(item) for item in payload]
except (json.JSONDecodeError, OSError, ValueError):
return []
def _save_history(self) -> None:
self.persistence_path.parent.mkdir(parents=True, exist_ok=True)
payload = [item.to_json() for item in self._history]
self.persistence_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")