Compare commits

..

1 Commits

Author SHA1 Message Date
ي
b0674dfa22 Add velocity-based safety escalation and lockout flow 2026-05-18 15:56:34 +05:30
4 changed files with 173 additions and 115 deletions

View File

@@ -697,39 +697,6 @@ class BaseALwrityAgent(ABC):
"action_id": action.action_id,
"agent_id": self.agent_id,
}
capability_decision = self._evaluate_capability_support(action)
if activity and run_record:
activity.log_event(
event_type="decision",
severity="info" if capability_decision.get("supported", False) else "warning",
message=capability_decision.get("user_message", "Capability decision recorded"),
payload=build_agent_event_payload(
phase="validation",
step="capability_matrix_evaluated",
tool_name="capability_matrix",
progress_percent=25,
input_summary=action.action_type,
output_summary="Supported action" if capability_decision.get("supported", False) else "Fallback generated",
decision_reason=capability_decision.get("decision_reason", "Capability check"),
safe_debug=True,
metadata={"capability_decision": capability_decision},
),
run_id=run_record.id,
agent_type=self.agent_type,
)
if not capability_decision.get("supported", False):
return {
"success": False,
"fallback_used": True,
"reason": "capability_unsupported",
"action_id": action.action_id,
"agent_id": self.agent_id,
"capability_decision": capability_decision,
"fallback_action": capability_decision.get("fallback_action"),
"user_message": capability_decision.get("user_message"),
}
# 2. Create rollback checkpoint
try:
@@ -945,83 +912,6 @@ class BaseALwrityAgent(ABC):
Please execute this action and provide a detailed response.
Consider user goals, safety constraints, and potential impacts.
"""
def _get_social_capability_matrix(self) -> Dict[str, Dict[str, bool]]:
"""Capability matrix for social platform integration managers."""
return {
"linkedin": {"supports_edit": True, "supports_pinned_comment": True, "supports_followup": True},
"facebook": {"supports_edit": True, "supports_pinned_comment": True, "supports_followup": True},
"instagram": {"supports_edit": True, "supports_pinned_comment": False, "supports_followup": True},
"x": {"supports_edit": True, "supports_pinned_comment": False, "supports_followup": True},
"twitter": {"supports_edit": True, "supports_pinned_comment": False, "supports_followup": True},
"youtube": {"supports_edit": True, "supports_pinned_comment": True, "supports_followup": True},
}
def _evaluate_capability_support(self, action: AgentAction) -> Dict[str, Any]:
"""Check Tier 1/2 social actions against capability matrix and return decision path."""
platform = str(action.parameters.get("platform", "")).strip().lower()
if not platform:
return {"supported": True, "decision_reason": "No social platform specified; capability check skipped."}
matrix = self._get_social_capability_matrix()
platform_caps = matrix.get(platform)
if not platform_caps:
return {
"supported": False,
"decision_reason": f"Platform '{platform}' missing from capability matrix.",
"fallback_action": self._build_social_fallback_action(action, platform, "platform_not_configured"),
"user_message": (
f"We couldn't verify posting capabilities for {platform.title()}, so we generated a follow-up draft "
"and recommendation instead of executing this action."
),
}
action_tier = str(action.parameters.get("action_tier", "")).strip().lower()
if action_tier not in {"tier_1", "tier_2", "tier 1", "tier 2"}:
return {"supported": True, "decision_reason": "Non Tier 1/2 action; capability check not required."}
action_type = action.action_type.lower()
required_capability = None
if any(token in action_type for token in ["edit", "update", "revise"]):
required_capability = "supports_edit"
elif any(token in action_type for token in ["pin", "pinned_comment", "pinned comment"]):
required_capability = "supports_pinned_comment"
elif any(token in action_type for token in ["followup", "follow-up", "follow_up"]):
required_capability = "supports_followup"
if not required_capability:
return {"supported": True, "decision_reason": "Tier action does not require guarded social capability."}
supported = bool(platform_caps.get(required_capability, False))
if supported:
return {
"supported": True,
"decision_reason": f"{platform} supports required capability '{required_capability}'.",
"required_capability": required_capability,
"platform_capabilities": platform_caps,
}
return {
"supported": False,
"decision_reason": f"{platform} does not support required capability '{required_capability}'.",
"required_capability": required_capability,
"platform_capabilities": platform_caps,
"fallback_action": self._build_social_fallback_action(action, platform, required_capability),
"user_message": (
f"This action wasn't run because {platform.title()} does not support {required_capability}. "
"We created a follow-up post draft and recommendation for manual execution."
),
}
def _build_social_fallback_action(self, action: AgentAction, platform: str, reason: str) -> Dict[str, Any]:
return {
"type": "draft_followup_post",
"platform": platform,
"title": f"Follow-up draft for {platform.title()}",
"draft": f"Follow-up for original action '{action.action_type}' on {action.target_resource}.",
"recommendation": "Review and publish manually, then notify the team.",
"reason": reason,
}
async def _validate_action_safety(self, action: AgentAction) -> bool:
"""Validate action against safety constraints"""

View File

@@ -99,6 +99,58 @@ class OptimizationRecommendation:
expires = datetime.utcnow().timestamp() + (7 * 24 * 60 * 60)
self.expires_at = datetime.fromtimestamp(expires).isoformat()
@dataclass
class EscalationVelocitySignal:
"""Measured action velocity signal used for escalation tiering."""
window_minutes: int
action_count: int
actions_per_minute: float
triggered: bool
class EscalationTier(Enum):
"""Escalation tier derived from measurable action velocity."""
TIER_1 = "tier_1"
TIER_2 = "tier_2"
TIER_3 = "tier_3"
class EscalationVelocityPolicy:
"""Velocity-based trigger policy for escalation tiers."""
def __init__(self):
self.tier_thresholds = {
EscalationTier.TIER_1: {"window_minutes": 15, "actions_per_minute": 0.8},
EscalationTier.TIER_2: {"window_minutes": 10, "actions_per_minute": 1.5},
EscalationTier.TIER_3: {"window_minutes": 5, "actions_per_minute": 3.0},
}
def measure_velocity(self, events: List[Dict[str, Any]], now: Optional[datetime] = None) -> Dict[EscalationTier, EscalationVelocitySignal]:
now = now or datetime.utcnow()
signals: Dict[EscalationTier, EscalationVelocitySignal] = {}
for tier, cfg in self.tier_thresholds.items():
cutoff = now - timedelta(minutes=cfg["window_minutes"])
count = sum(1 for event in events if datetime.fromisoformat(event["timestamp"]) >= cutoff)
velocity = count / max(cfg["window_minutes"], 1)
signals[tier] = EscalationVelocitySignal(
window_minutes=cfg["window_minutes"],
action_count=count,
actions_per_minute=velocity,
triggered=velocity >= cfg["actions_per_minute"]
)
return signals
def determine_tier(self, events: List[Dict[str, Any]], now: Optional[datetime] = None) -> Tuple[Optional[EscalationTier], Dict[EscalationTier, EscalationVelocitySignal]]:
signals = self.measure_velocity(events, now=now)
for tier in [EscalationTier.TIER_3, EscalationTier.TIER_2, EscalationTier.TIER_1]:
if signals[tier].triggered:
return tier, signals
return None, signals
class AgentPerformanceMonitor:
"""Main performance monitoring system for agents"""

View File

@@ -13,6 +13,7 @@ from enum import Enum
from utils.logger_utils import get_service_logger
from services.database import get_session_for_user
from services.intelligence.agents.performance_monitor import EscalationVelocityPolicy, EscalationTier
logger = get_service_logger(__name__)
@@ -84,6 +85,25 @@ class SafetyValidation:
if self.validation_timestamp is None:
self.validation_timestamp = datetime.utcnow().isoformat()
@dataclass
class EscalationDecision:
"""Structured escalation payload for autonomous safety routing."""
tier: str
action: str
confidence: float
risk_class: str
rationale: str
velocity: Dict[str, Any]
lockout_auto_edits: bool
executor: Optional[str]
created_at: str = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.utcnow().isoformat()
class SafetyConstraintManager:
"""Manages safety constraints for agent actions"""
@@ -92,6 +112,11 @@ class SafetyConstraintManager:
self.constraints: Dict[str, SafetyConstraint] = {}
self.action_history: List[Dict[str, Any]] = []
self.violation_history: List[Dict[str, Any]] = []
self.escalation_policy = EscalationVelocityPolicy()
self.escalation_history: List[Dict[str, Any]] = []
self.auto_edit_lockout = False
self.executor_routes = {"tier_1": "autonomous_guardian_executor", "tier_2": "autonomous_recovery_executor"}
self.alert_history: List[Dict[str, Any]] = []
# Initialize default constraints
self._initialize_default_constraints()
@@ -213,7 +238,7 @@ class SafetyConstraintManager:
# Record in history
await self._record_validation_history(action_data, is_valid, violations)
return SafetyValidation(
validation = SafetyValidation(
is_valid=is_valid,
risk_level=risk_level,
violations=violations,
@@ -221,6 +246,10 @@ class SafetyConstraintManager:
requires_approval=requires_approval,
confidence_score=max(0.0, min(1.0, confidence_score))
)
escalation = await self.evaluate_escalation(action_data, validation)
if escalation:
recommendations.append(f"Escalation action: {escalation.action} ({escalation.tier})")
return validation
except Exception as e:
logger.error(f"Error validating action for user {self.user_id}: {e}")
@@ -466,6 +495,97 @@ class SafetyConstraintManager:
if len(self.violation_history) > 500:
self.violation_history = self.violation_history[-500:]
async def evaluate_escalation(self, action_data: Dict[str, Any], validation: SafetyValidation) -> Optional[EscalationDecision]:
"""Evaluate velocity-triggered escalation and produce structured decision payload."""
if self.auto_edit_lockout:
decision = EscalationDecision(
tier=EscalationTier.TIER_3.value,
action="lockout_enforced",
confidence=1.0,
risk_class=RiskLevel.CRITICAL.value,
rationale="Tier 3 lockout already active; autonomous edits blocked until manual reset",
velocity={},
lockout_auto_edits=True,
executor=None
)
await self._persist_escalation_decision(decision, action_data, outcome={"status": "blocked_by_lockout"})
return decision
tier, signals = self.escalation_policy.determine_tier(self.action_history)
if not tier:
return None
risk_class_map = {EscalationTier.TIER_1: RiskLevel.MEDIUM.value, EscalationTier.TIER_2: RiskLevel.HIGH.value, EscalationTier.TIER_3: RiskLevel.CRITICAL.value}
confidence = min(1.0, max(0.1, 0.55 + (len(validation.violations) * 0.05) + ((1 - validation.confidence_score) * 0.4)))
velocity_signal = signals[tier]
velocity_payload = {
"window_minutes": velocity_signal.window_minutes,
"action_count": velocity_signal.action_count,
"actions_per_minute": round(velocity_signal.actions_per_minute, 4),
"threshold_actions_per_minute": self.escalation_policy.tier_thresholds[tier]["actions_per_minute"],
}
executor = self.executor_routes.get(tier.value)
action = "route_to_autonomous_executor" if tier in (EscalationTier.TIER_1, EscalationTier.TIER_2) else "lockout_autonomous_edits"
rationale = f"{tier.value} triggered by velocity {velocity_payload['actions_per_minute']}/min over {velocity_signal.window_minutes}m window"
decision = EscalationDecision(
tier=tier.value,
action=action,
confidence=round(confidence, 3),
risk_class=risk_class_map[tier],
rationale=rationale,
velocity=velocity_payload,
lockout_auto_edits=(tier == EscalationTier.TIER_3),
executor=executor if tier != EscalationTier.TIER_3 else None
)
outcome = await self._apply_escalation_decision(decision, action_data, validation)
await self._persist_escalation_decision(decision, action_data, outcome=outcome)
return decision
async def _apply_escalation_decision(self, decision: EscalationDecision, action_data: Dict[str, Any], validation: SafetyValidation) -> Dict[str, Any]:
if decision.tier in (EscalationTier.TIER_1.value, EscalationTier.TIER_2.value):
return {
"status": "routed",
"executor": decision.executor,
"reason": decision.rationale
}
self.auto_edit_lockout = True
brief = {
"type": "diagnostic_brief",
"severity": "critical",
"tier": decision.tier,
"user_rationale": "Autonomous edits have been paused to protect account safety after sustained high-velocity actions.",
"validation_violations": validation.violations,
"action_type": action_data.get("action_type", "unknown"),
"timestamp": datetime.utcnow().isoformat()
}
self.alert_history.append(brief)
if len(self.alert_history) > 500:
self.alert_history = self.alert_history[-500:]
return {"status": "lockout_enabled", "diagnostic_brief": brief}
async def _persist_escalation_decision(self, decision: EscalationDecision, action_data: Dict[str, Any], outcome: Dict[str, Any]):
record = {
"timestamp": datetime.utcnow().isoformat(),
"decision": asdict(decision),
"action_data": action_data,
"outcome": outcome
}
self.escalation_history.append(record)
if len(self.escalation_history) > 2000:
self.escalation_history = self.escalation_history[-2000:]
def get_escalation_history(self, limit: int = 100) -> List[Dict[str, Any]]:
return self.escalation_history[-limit:] if self.escalation_history else []
def reset_auto_edit_lockout(self):
self.auto_edit_lockout = False
def add_custom_constraint(self, constraint: SafetyConstraint):
"""Add a custom safety constraint"""
self.constraints[constraint.constraint_id] = constraint

View File

@@ -69,10 +69,6 @@ class SocialAmplificationAgent(BaseALwrityAgent):
# Instruction will be provided via orchestrator context or initial prompt
# Instruction should be provided during invocation or via orchestrator context
)
def get_social_integration_capabilities(self) -> Dict[str, Dict[str, bool]]:
"""Expose platform capability flags used by social integration managers."""
return self._get_social_capability_matrix()
# Tool Implementations