Compare commits

..

1 Commits

Author SHA1 Message Date
ي
87925c8fdc Add tiered anomaly policy and safety lock arbitration 2026-05-18 16:01:43 +05:30
3 changed files with 155 additions and 276 deletions

View File

@@ -99,6 +99,17 @@ class OptimizationRecommendation:
expires = datetime.utcnow().timestamp() + (7 * 24 * 60 * 60)
self.expires_at = datetime.fromtimestamp(expires).isoformat()
@dataclass
class TierPolicyConfig:
"""Structured policy for anomaly tiers and remediation controls"""
tier: int
trigger_metrics: List[str]
thresholds: Dict[str, float]
max_iterations: int
lock_criteria: Dict[str, Any]
class AgentPerformanceMonitor:
"""Main performance monitoring system for agents"""
@@ -108,6 +119,32 @@ class AgentPerformanceMonitor:
self.agent_snapshots: Dict[str, AgentPerformanceSnapshot] = {}
self.recommendations: List[OptimizationRecommendation] = []
self.performance_history: deque = deque(maxlen=1000) # Keep last 1000 data points
self.systemic_alerts: List[Dict[str, Any]] = []
# Structured tier policy config
self.tier_policy_config: Dict[int, TierPolicyConfig] = {
1: TierPolicyConfig(
tier=1,
trigger_metrics=["success_rate", "efficiency_score", "response_time"],
thresholds={"success_rate": 0.80, "efficiency_score": 0.65, "response_time": 45.0},
max_iterations=3,
lock_criteria={"min_confidence": 0.85, "consecutive_failures": 6}
),
2: TierPolicyConfig(
tier=2,
trigger_metrics=["success_rate", "efficiency_score", "response_time", "market_impact"],
thresholds={"success_rate": 0.70, "efficiency_score": 0.50, "response_time": 60.0, "market_impact": 0.35},
max_iterations=2,
lock_criteria={"min_confidence": 0.75, "consecutive_failures": 4}
),
3: TierPolicyConfig(
tier=3,
trigger_metrics=["success_rate", "efficiency_score", "response_time", "market_impact"],
thresholds={"success_rate": 0.55, "efficiency_score": 0.35, "response_time": 90.0, "market_impact": 0.25},
max_iterations=1,
lock_criteria={"min_confidence": 0.65, "consecutive_failures": 3}
)
}
# Performance thresholds and targets
self.performance_targets = {
@@ -513,6 +550,54 @@ class AgentPerformanceMonitor:
}
return priority_weights.get(priority, 0)
def _build_recommended_action_payload(self, agent_id: str, snapshot: AgentPerformanceSnapshot) -> Dict[str, Any]:
"""Build recommended action payload including tier and confidence."""
tier = 1
if (snapshot.success_rate <= self.tier_policy_config[3].thresholds["success_rate"] or
snapshot.efficiency_score <= self.tier_policy_config[3].thresholds["efficiency_score"] or
snapshot.average_response_time >= self.tier_policy_config[3].thresholds["response_time"] or
snapshot.market_impact_score <= self.tier_policy_config[3].thresholds["market_impact"]):
tier = 3
elif (snapshot.success_rate <= self.tier_policy_config[2].thresholds["success_rate"] or
snapshot.efficiency_score <= self.tier_policy_config[2].thresholds["efficiency_score"] or
snapshot.average_response_time >= self.tier_policy_config[2].thresholds["response_time"] or
snapshot.market_impact_score <= self.tier_policy_config[2].thresholds["market_impact"]):
tier = 2
confidence = round(max(0.0, min(1.0, 1.0 - abs(0.75 - self._calculate_health_score(snapshot)))) , 2)
policy = self.tier_policy_config[tier]
return {
"agent_id": agent_id,
"tier": tier,
"confidence": confidence,
"max_iterations": policy.max_iterations,
"lock_criteria": policy.lock_criteria,
"trigger_metrics": policy.trigger_metrics
}
def _route_tier3_systemic_alert(self, action_payload: Dict[str, Any], alerts: List[Dict[str, Any]]) -> None:
"""Route Tier 3 systemic anomalies to alerting subsystem with diagnostic brief."""
diagnostic_brief = {
"type": "systemic_anomaly",
"severity": "critical",
"tier": 3,
"confidence": action_payload.get("confidence", 0.0),
"agent_id": action_payload.get("agent_id"),
"timestamp": datetime.utcnow().isoformat(),
"diagnostic_brief": {
"trigger_metrics": action_payload.get("trigger_metrics", []),
"alerts": alerts,
"max_iterations": action_payload.get("max_iterations"),
"lock_criteria": action_payload.get("lock_criteria", {})
}
}
self.systemic_alerts.append(diagnostic_brief)
if len(self.systemic_alerts) > 200:
self.systemic_alerts = self.systemic_alerts[-200:]
logger.critical(f"[ALERTING_SUBSYSTEM] Tier 3 systemic anomaly routed: {json.dumps(diagnostic_brief)}")
async def get_performance_alerts(self, agent_id: str) -> List[Dict[str, Any]]:
"""Get performance alerts for an agent"""
alerts = []
@@ -574,6 +659,13 @@ class AgentPerformanceMonitor:
"timestamp": datetime.utcnow().isoformat()
})
action_payload = self._build_recommended_action_payload(agent_id, snapshot)
if action_payload["tier"] == 3:
self._route_tier3_systemic_alert(action_payload, alerts)
for alert in alerts:
alert["recommended_action"] = action_payload
return alerts
except Exception as e:

View File

@@ -84,6 +84,17 @@ class SafetyValidation:
if self.validation_timestamp is None:
self.validation_timestamp = datetime.utcnow().isoformat()
@dataclass
class SafetyArbitrationDecision:
"""Explicit allow/deny/lock decision with reasons."""
decision: str
reasons: List[str]
tier: int
confidence: float
lock_state_active: bool
class SafetyConstraintManager:
"""Manages safety constraints for agent actions"""
@@ -92,6 +103,8 @@ class SafetyConstraintManager:
self.constraints: Dict[str, SafetyConstraint] = {}
self.action_history: List[Dict[str, Any]] = []
self.violation_history: List[Dict[str, Any]] = []
self.lock_state_active: bool = False
self.lock_state_reason: Optional[str] = None
# Initialize default constraints
self._initialize_default_constraints()
@@ -163,6 +176,17 @@ class SafetyConstraintManager:
"""Validate an action against safety constraints"""
try:
logger.info(f"Validating action for user {self.user_id}: {action_data.get('action_type', 'unknown')}")
if self.lock_state_active and action_data.get("autonomous_modification", True):
reason = self.lock_state_reason or "Safety lock is active due to Tier 3 systemic anomaly"
return SafetyValidation(
is_valid=False,
risk_level=RiskLevel.CRITICAL,
violations=["Autonomous modifications blocked while lock state is active"],
recommendations=[reason],
requires_approval=True,
confidence_score=1.0
)
violations = []
recommendations = []
@@ -207,19 +231,29 @@ class SafetyConstraintManager:
# Final validation
is_valid = len(violations) == 0 and not requires_approval
logger.info(f"Action validation completed for user {self.user_id}. Valid: {is_valid}, Risk: {risk_level.value}, Violations: {len(violations)}")
confidence_score = max(0.0, min(1.0, confidence_score))
arbitration = self._arbitrate_decision(action_data, risk_level, violations, requires_approval, confidence_score)
if arbitration.decision == "lock":
self.lock_state_active = True
self.lock_state_reason = "; ".join(arbitration.reasons)
is_valid = False
requires_approval = True
recommendations.extend([f"Arbitration decision: {arbitration.decision}", *arbitration.reasons])
logger.info(f"Action validation completed for user {self.user_id}. Decision: {arbitration.decision}, Valid: {is_valid}, Risk: {risk_level.value}, Violations: {len(violations)}")
# Record in history
await self._record_validation_history(action_data, is_valid, violations)
return SafetyValidation(
is_valid=is_valid,
risk_level=risk_level,
violations=violations,
recommendations=recommendations,
requires_approval=requires_approval,
confidence_score=max(0.0, min(1.0, confidence_score))
confidence_score=confidence_score
)
except Exception as e:
@@ -235,6 +269,30 @@ class SafetyConstraintManager:
confidence_score=0.0
)
def _arbitrate_decision(self, action_data: Dict[str, Any], risk_level: RiskLevel, violations: List[str], requires_approval: bool, confidence_score: float) -> SafetyArbitrationDecision:
"""Arbitrate allow/deny/lock with explicit reasons."""
reasons: List[str] = []
tier = int(action_data.get("recommended_tier", 1))
if self.lock_state_active:
reasons.append("Existing lock state is active")
return SafetyArbitrationDecision("lock", reasons, tier, confidence_score, True)
if tier >= 3 or risk_level == RiskLevel.CRITICAL:
reasons.append("Tier 3 systemic anomaly or critical risk detected")
if violations:
reasons.extend(violations)
return SafetyArbitrationDecision("lock", reasons, 3, confidence_score, True)
if violations or requires_approval:
reasons.append("Safety policy violation or approval requirement triggered")
reasons.extend(violations)
return SafetyArbitrationDecision("deny", reasons, tier, confidence_score, False)
reasons.append("No policy violations detected")
return SafetyArbitrationDecision("allow", reasons, tier, confidence_score, False)
def _determine_action_category(self, action_type: str) -> ActionCategory:
"""Determine the category of an action"""
action_type_lower = action_type.lower()

View File

@@ -1,271 +0,0 @@
"""Self-healing executor for social post engagement recovery.
Implements:
- Per-post evaluation windows and cooldown timers
- Stagnation trigger evaluation with tiered action selection
- Action idempotency keys for edit/comment/thread operations
- Duplicate and over-frequency suppression within cooldown boundaries
- Outcome persistence and safe retry policy for transient failures
"""
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta, timezone
from enum import Enum
import hashlib
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
class ActionType(str, Enum):
EDIT = "edit"
COMMENT = "comment"
THREAD = "thread"
class ActionTier(str, Enum):
TIER_1 = "tier_1" # low-intensity nudge (comment)
TIER_2 = "tier_2" # medium-intensity enhancement (edit)
TIER_3 = "tier_3" # high-intensity amplification (thread)
SAFE_TRANSIENT_ERROR_CODES = {
"timeout",
"rate_limit",
"service_unavailable",
"network_error",
}
@dataclass
class EvaluationConfig:
per_post_window_minutes: int = 90
min_samples_required: int = 3
cooldown_by_action_seconds: Dict[ActionType, int] = field(
default_factory=lambda: {
ActionType.COMMENT: 30 * 60,
ActionType.EDIT: 2 * 60 * 60,
ActionType.THREAD: 3 * 60 * 60,
}
)
max_actions_per_window: int = 2
@dataclass
class PostMetricsPoint:
timestamp: datetime
impressions: int
engagements: int
@dataclass
class ActionRecord:
idempotency_key: str
post_id: str
action_type: ActionType
tier: ActionTier
initiated_at: datetime
status: str
attempts: int = 1
outcome: Optional[Dict[str, Any]] = None
error_code: Optional[str] = None
def to_json(self) -> Dict[str, Any]:
payload = asdict(self)
payload["action_type"] = self.action_type.value
payload["tier"] = self.tier.value
payload["initiated_at"] = self.initiated_at.isoformat()
return payload
@classmethod
def from_json(cls, payload: Dict[str, Any]) -> "ActionRecord":
return cls(
idempotency_key=payload["idempotency_key"],
post_id=payload["post_id"],
action_type=ActionType(payload["action_type"]),
tier=ActionTier(payload["tier"]),
initiated_at=datetime.fromisoformat(payload["initiated_at"]),
status=payload["status"],
attempts=payload.get("attempts", 1),
outcome=payload.get("outcome"),
error_code=payload.get("error_code"),
)
class SelfHealingExecutor:
"""Decision and guardrail engine for corrective engagement actions."""
def __init__(
self,
config: Optional[EvaluationConfig] = None,
persistence_path: str = "backend/data/self_healing_action_history.json",
) -> None:
self.config = config or EvaluationConfig()
self.persistence_path = Path(persistence_path)
self._history: List[ActionRecord] = self._load_history()
def evaluate_and_plan(
self,
post_id: str,
metrics: List[PostMetricsPoint],
now: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Evaluate stagnation for a post and plan a single best next action."""
now = now or datetime.now(timezone.utc)
window_metrics = self._filter_window(metrics, now)
if len(window_metrics) < self.config.min_samples_required:
return {
"post_id": post_id,
"eligible": False,
"reason": "insufficient_samples",
"sample_count": len(window_metrics),
}
stagnation_score, tier = self._evaluate_stagnation(window_metrics)
action_type = self._choose_action_type(tier)
idempotency_key = self.generate_idempotency_key(post_id, action_type, tier)
if self._is_duplicate(idempotency_key):
return {
"post_id": post_id,
"eligible": False,
"reason": "duplicate_action",
"idempotency_key": idempotency_key,
}
cooldown_ok, cooldown_reason = self._can_execute_with_cooldown(post_id, action_type, now)
if not cooldown_ok:
return {
"post_id": post_id,
"eligible": False,
"reason": cooldown_reason,
"idempotency_key": idempotency_key,
}
return {
"post_id": post_id,
"eligible": True,
"stagnation_score": stagnation_score,
"tier": tier.value,
"action_type": action_type.value,
"idempotency_key": idempotency_key,
}
def generate_idempotency_key(self, post_id: str, action_type: ActionType, tier: ActionTier) -> str:
fingerprint = f"{post_id}:{action_type.value}:{tier.value}".encode("utf-8")
digest = hashlib.sha256(fingerprint).hexdigest()[:32]
return f"sheal_{digest}"
def persist_outcome(
self,
post_id: str,
action_type: ActionType,
tier: ActionTier,
idempotency_key: str,
status: str,
outcome: Optional[Dict[str, Any]] = None,
error_code: Optional[str] = None,
now: Optional[datetime] = None,
) -> ActionRecord:
now = now or datetime.now(timezone.utc)
existing = next((h for h in self._history if h.idempotency_key == idempotency_key), None)
if existing:
existing.status = status
existing.outcome = outcome
existing.error_code = error_code
existing.attempts += 1
existing.initiated_at = now
record = existing
else:
record = ActionRecord(
idempotency_key=idempotency_key,
post_id=post_id,
action_type=action_type,
tier=tier,
initiated_at=now,
status=status,
outcome=outcome,
error_code=error_code,
)
self._history.append(record)
self._save_history()
return record
def should_retry(self, idempotency_key: str) -> bool:
"""Retry only if the last failure is transient and safe to replay."""
rec = next((h for h in self._history if h.idempotency_key == idempotency_key), None)
if not rec or rec.status != "failed":
return False
if rec.error_code not in SAFE_TRANSIENT_ERROR_CODES:
return False
return rec.action_type in {ActionType.COMMENT, ActionType.EDIT, ActionType.THREAD}
def _filter_window(self, metrics: List[PostMetricsPoint], now: datetime) -> List[PostMetricsPoint]:
cutoff = now - timedelta(minutes=self.config.per_post_window_minutes)
return [m for m in metrics if m.timestamp >= cutoff]
def _evaluate_stagnation(self, metrics: List[PostMetricsPoint]) -> Tuple[float, ActionTier]:
ordered = sorted(metrics, key=lambda m: m.timestamp)
first, last = ordered[0], ordered[-1]
imp_delta = max(0, last.impressions - first.impressions)
eng_delta = max(0, last.engagements - first.engagements)
eng_rate = eng_delta / imp_delta if imp_delta > 0 else 0.0
stagnation_score = 1.0 - min(1.0, eng_rate * 20)
if stagnation_score >= 0.8:
return stagnation_score, ActionTier.TIER_3
if stagnation_score >= 0.55:
return stagnation_score, ActionTier.TIER_2
return stagnation_score, ActionTier.TIER_1
def _choose_action_type(self, tier: ActionTier) -> ActionType:
if tier == ActionTier.TIER_1:
return ActionType.COMMENT
if tier == ActionTier.TIER_2:
return ActionType.EDIT
return ActionType.THREAD
def _is_duplicate(self, idempotency_key: str) -> bool:
return any(h.idempotency_key == idempotency_key and h.status in {"success", "running"} for h in self._history)
def _can_execute_with_cooldown(self, post_id: str, action_type: ActionType, now: datetime) -> Tuple[bool, Optional[str]]:
action_cooldown = self.config.cooldown_by_action_seconds[action_type]
same_post = [h for h in self._history if h.post_id == post_id]
recent_in_window = [
h for h in same_post
if h.initiated_at >= now - timedelta(minutes=self.config.per_post_window_minutes)
]
if len(recent_in_window) >= self.config.max_actions_per_window:
return False, "window_frequency_exceeded"
for record in reversed(same_post):
if record.action_type != action_type:
continue
if (now - record.initiated_at).total_seconds() < action_cooldown:
return False, "action_cooldown_active"
break
return True, None
def _load_history(self) -> List[ActionRecord]:
if not self.persistence_path.exists():
return []
try:
payload = json.loads(self.persistence_path.read_text(encoding="utf-8"))
return [ActionRecord.from_json(item) for item in payload]
except (json.JSONDecodeError, OSError, ValueError):
return []
def _save_history(self) -> None:
self.persistence_path.parent.mkdir(parents=True, exist_ok=True)
payload = [item.to_json() for item in self._history]
self.persistence_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")