Compare commits

..

1 Commits

Author SHA1 Message Date
ي
23489fdc12 Add flat-context synthesis and mnemonic prompt injection 2026-05-18 16:00:56 +05:30
3 changed files with 109 additions and 174 deletions

View File

@@ -101,6 +101,7 @@ class AgentContextVFS:
"/steps/integrations": AgentFlatContextStore.STEP5_FILENAME,
}
HIGH_SIGNAL_MARKERS = ("agent_summary", "high_signal_terms", "quick_facts", "context_type")
LOW_CONFIDENCE_MARKER = "low_confidence"
def __init__(self, user_id: str, project_id: Optional[str] = None):
self.user_id = user_id
@@ -294,6 +295,101 @@ class AgentContextVFS:
)
return ranked[: max(1, top_k)]
@staticmethod
def _mnemonic_token(result: Dict[str, Any], rank: int) -> str:
"""Create compressed mnemonic token with source reference."""
path = str(result.get("path") or "unknown")
reason = str(result.get("reason") or "match")
confidence = float(result.get("confidence") or 0.0)
low_flag = "!" if result.get(AgentContextVFS.LOW_CONFIDENCE_MARKER) else ""
src = path.replace(".json", "").replace("_", "-")[:28]
hint = reason.replace(" ", "-")[:20]
return f"M{rank}:{src}|{hint}|c{confidence:.2f}{low_flag}"
@staticmethod
def _detect_contradictions(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Detect contradictory learnings by path with conflicting reasons/relevance classes."""
by_path: Dict[str, List[Dict[str, Any]]] = {}
for item in results:
p = str(item.get("path") or "")
by_path.setdefault(p, []).append(item)
contradictions: List[Dict[str, Any]] = []
for path, rows in by_path.items():
reasons = {str(r.get("reason") or "").strip().lower() for r in rows}
relevance = {str(r.get("relevance") or "").strip().lower() for r in rows}
# contradictory if both high/supported or mixed summary/body signals in same source cluster
if len(reasons) > 1 and len(relevance) > 1:
contradictions.append(
{
"path": path,
"reason_variants": sorted([r for r in reasons if r]),
"relevance_variants": sorted([r for r in relevance if r]),
"count": len(rows),
}
)
return contradictions
def _run_synthesis_pipeline(
self, ranked_results: List[Dict[str, Any]], *, char_budget: int = 1200, top_k: int = 5
) -> Dict[str, Any]:
"""
Flat-context synthesis pipeline:
1) Compress telemetry into mnemonic tokens with source references
2) Detect contradictions and mark low-confidence heuristics
3) Select top-ranked, budget-fitting tokens for prompt injection
4) Persist synthesis + source lineage for explainability
"""
contradictions = self._detect_contradictions(ranked_results)
contradiction_paths = {c["path"] for c in contradictions}
normalized: List[Dict[str, Any]] = []
for idx, item in enumerate(ranked_results, start=1):
row = dict(item)
low_conf = bool(row.get("low_probability")) or (str(row.get("path") or "") in contradiction_paths)
row[self.LOW_CONFIDENCE_MARKER] = low_conf
if low_conf:
row["confidence"] = round(max(0.05, float(row.get("confidence", 0.0)) * 0.7), 3)
row["mnemonic_token"] = self._mnemonic_token(row, idx)
normalized.append(row)
chosen: List[Dict[str, Any]] = []
used = 0
for row in normalized[: max(1, top_k * 3)]:
token = str(row.get("mnemonic_token") or "")
cost = len(token) + 8
if chosen and used + cost > char_budget:
continue
chosen.append(row)
used += cost
if len(chosen) >= top_k:
break
synthesis = {
"created_at": datetime.now(timezone.utc).isoformat(),
"top_k": top_k,
"char_budget": char_budget,
"char_budget_used": used,
"selected_mnemonics": [c.get("mnemonic_token") for c in chosen],
"source_lineage": [
{
"mnemonic_token": c.get("mnemonic_token"),
"path": c.get("path"),
"reason": c.get("reason"),
"confidence": c.get("confidence"),
"low_confidence": c.get(self.LOW_CONFIDENCE_MARKER, False),
}
for c in chosen
],
"contradictions": contradictions,
}
self.append_activity_log(
event_type="flat_context_synthesis",
actor="agent_context_vfs",
details=synthesis,
)
return {"ranked_results": normalized, "synthesis": synthesis}
@staticmethod
def _resolve_json_path(data: Any, path_query: str) -> Any:
"""Resolve dot/bracket JSON path such as 'data.seo_audit.recommendations[0]'."""
@@ -518,15 +614,26 @@ class AgentContextVFS:
bounded_results.append(r)
used += cost
synthesis_bundle = self._run_synthesis_pipeline(
self._static_triage(bounded_results, normalized),
char_budget=1200,
top_k=5,
)
triaged_results = synthesis_bundle["ranked_results"]
synthesis = synthesis_bundle["synthesis"]
result = {
"query": normalized,
"attempted_queries": attempted_queries,
"matched_files_count": len(matched_files),
"results": self._static_triage(bounded_results, normalized),
"results": triaged_results,
"notice": notice,
"char_budget_used": used,
"can_answer": bool(bounded_results),
"synthesis": synthesis,
"prompt_context_mnemonics": synthesis.get("selected_mnemonics", []),
}
# Top-ranked, budget-fitting mnemonic tokens are the only ones intended for prompt context injection.
result["triage_top5"] = self._llm_router_stub(result["results"], top_k=5)
logger.info(
f"[vfs_audit] user={self.store.safe_user_id} action=search_context query={normalized!r} results={len(result['results'])}"

View File

@@ -99,58 +99,6 @@ class OptimizationRecommendation:
expires = datetime.utcnow().timestamp() + (7 * 24 * 60 * 60)
self.expires_at = datetime.fromtimestamp(expires).isoformat()
@dataclass
class EscalationVelocitySignal:
"""Measured action velocity signal used for escalation tiering."""
window_minutes: int
action_count: int
actions_per_minute: float
triggered: bool
class EscalationTier(Enum):
"""Escalation tier derived from measurable action velocity."""
TIER_1 = "tier_1"
TIER_2 = "tier_2"
TIER_3 = "tier_3"
class EscalationVelocityPolicy:
"""Velocity-based trigger policy for escalation tiers."""
def __init__(self):
self.tier_thresholds = {
EscalationTier.TIER_1: {"window_minutes": 15, "actions_per_minute": 0.8},
EscalationTier.TIER_2: {"window_minutes": 10, "actions_per_minute": 1.5},
EscalationTier.TIER_3: {"window_minutes": 5, "actions_per_minute": 3.0},
}
def measure_velocity(self, events: List[Dict[str, Any]], now: Optional[datetime] = None) -> Dict[EscalationTier, EscalationVelocitySignal]:
now = now or datetime.utcnow()
signals: Dict[EscalationTier, EscalationVelocitySignal] = {}
for tier, cfg in self.tier_thresholds.items():
cutoff = now - timedelta(minutes=cfg["window_minutes"])
count = sum(1 for event in events if datetime.fromisoformat(event["timestamp"]) >= cutoff)
velocity = count / max(cfg["window_minutes"], 1)
signals[tier] = EscalationVelocitySignal(
window_minutes=cfg["window_minutes"],
action_count=count,
actions_per_minute=velocity,
triggered=velocity >= cfg["actions_per_minute"]
)
return signals
def determine_tier(self, events: List[Dict[str, Any]], now: Optional[datetime] = None) -> Tuple[Optional[EscalationTier], Dict[EscalationTier, EscalationVelocitySignal]]:
signals = self.measure_velocity(events, now=now)
for tier in [EscalationTier.TIER_3, EscalationTier.TIER_2, EscalationTier.TIER_1]:
if signals[tier].triggered:
return tier, signals
return None, signals
class AgentPerformanceMonitor:
"""Main performance monitoring system for agents"""

View File

@@ -13,7 +13,6 @@ from enum import Enum
from utils.logger_utils import get_service_logger
from services.database import get_session_for_user
from services.intelligence.agents.performance_monitor import EscalationVelocityPolicy, EscalationTier
logger = get_service_logger(__name__)
@@ -85,25 +84,6 @@ class SafetyValidation:
if self.validation_timestamp is None:
self.validation_timestamp = datetime.utcnow().isoformat()
@dataclass
class EscalationDecision:
"""Structured escalation payload for autonomous safety routing."""
tier: str
action: str
confidence: float
risk_class: str
rationale: str
velocity: Dict[str, Any]
lockout_auto_edits: bool
executor: Optional[str]
created_at: str = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.utcnow().isoformat()
class SafetyConstraintManager:
"""Manages safety constraints for agent actions"""
@@ -112,11 +92,6 @@ class SafetyConstraintManager:
self.constraints: Dict[str, SafetyConstraint] = {}
self.action_history: List[Dict[str, Any]] = []
self.violation_history: List[Dict[str, Any]] = []
self.escalation_policy = EscalationVelocityPolicy()
self.escalation_history: List[Dict[str, Any]] = []
self.auto_edit_lockout = False
self.executor_routes = {"tier_1": "autonomous_guardian_executor", "tier_2": "autonomous_recovery_executor"}
self.alert_history: List[Dict[str, Any]] = []
# Initialize default constraints
self._initialize_default_constraints()
@@ -238,7 +213,7 @@ class SafetyConstraintManager:
# Record in history
await self._record_validation_history(action_data, is_valid, violations)
validation = SafetyValidation(
return SafetyValidation(
is_valid=is_valid,
risk_level=risk_level,
violations=violations,
@@ -246,10 +221,6 @@ class SafetyConstraintManager:
requires_approval=requires_approval,
confidence_score=max(0.0, min(1.0, confidence_score))
)
escalation = await self.evaluate_escalation(action_data, validation)
if escalation:
recommendations.append(f"Escalation action: {escalation.action} ({escalation.tier})")
return validation
except Exception as e:
logger.error(f"Error validating action for user {self.user_id}: {e}")
@@ -495,97 +466,6 @@ class SafetyConstraintManager:
if len(self.violation_history) > 500:
self.violation_history = self.violation_history[-500:]
async def evaluate_escalation(self, action_data: Dict[str, Any], validation: SafetyValidation) -> Optional[EscalationDecision]:
"""Evaluate velocity-triggered escalation and produce structured decision payload."""
if self.auto_edit_lockout:
decision = EscalationDecision(
tier=EscalationTier.TIER_3.value,
action="lockout_enforced",
confidence=1.0,
risk_class=RiskLevel.CRITICAL.value,
rationale="Tier 3 lockout already active; autonomous edits blocked until manual reset",
velocity={},
lockout_auto_edits=True,
executor=None
)
await self._persist_escalation_decision(decision, action_data, outcome={"status": "blocked_by_lockout"})
return decision
tier, signals = self.escalation_policy.determine_tier(self.action_history)
if not tier:
return None
risk_class_map = {EscalationTier.TIER_1: RiskLevel.MEDIUM.value, EscalationTier.TIER_2: RiskLevel.HIGH.value, EscalationTier.TIER_3: RiskLevel.CRITICAL.value}
confidence = min(1.0, max(0.1, 0.55 + (len(validation.violations) * 0.05) + ((1 - validation.confidence_score) * 0.4)))
velocity_signal = signals[tier]
velocity_payload = {
"window_minutes": velocity_signal.window_minutes,
"action_count": velocity_signal.action_count,
"actions_per_minute": round(velocity_signal.actions_per_minute, 4),
"threshold_actions_per_minute": self.escalation_policy.tier_thresholds[tier]["actions_per_minute"],
}
executor = self.executor_routes.get(tier.value)
action = "route_to_autonomous_executor" if tier in (EscalationTier.TIER_1, EscalationTier.TIER_2) else "lockout_autonomous_edits"
rationale = f"{tier.value} triggered by velocity {velocity_payload['actions_per_minute']}/min over {velocity_signal.window_minutes}m window"
decision = EscalationDecision(
tier=tier.value,
action=action,
confidence=round(confidence, 3),
risk_class=risk_class_map[tier],
rationale=rationale,
velocity=velocity_payload,
lockout_auto_edits=(tier == EscalationTier.TIER_3),
executor=executor if tier != EscalationTier.TIER_3 else None
)
outcome = await self._apply_escalation_decision(decision, action_data, validation)
await self._persist_escalation_decision(decision, action_data, outcome=outcome)
return decision
async def _apply_escalation_decision(self, decision: EscalationDecision, action_data: Dict[str, Any], validation: SafetyValidation) -> Dict[str, Any]:
if decision.tier in (EscalationTier.TIER_1.value, EscalationTier.TIER_2.value):
return {
"status": "routed",
"executor": decision.executor,
"reason": decision.rationale
}
self.auto_edit_lockout = True
brief = {
"type": "diagnostic_brief",
"severity": "critical",
"tier": decision.tier,
"user_rationale": "Autonomous edits have been paused to protect account safety after sustained high-velocity actions.",
"validation_violations": validation.violations,
"action_type": action_data.get("action_type", "unknown"),
"timestamp": datetime.utcnow().isoformat()
}
self.alert_history.append(brief)
if len(self.alert_history) > 500:
self.alert_history = self.alert_history[-500:]
return {"status": "lockout_enabled", "diagnostic_brief": brief}
async def _persist_escalation_decision(self, decision: EscalationDecision, action_data: Dict[str, Any], outcome: Dict[str, Any]):
record = {
"timestamp": datetime.utcnow().isoformat(),
"decision": asdict(decision),
"action_data": action_data,
"outcome": outcome
}
self.escalation_history.append(record)
if len(self.escalation_history) > 2000:
self.escalation_history = self.escalation_history[-2000:]
def get_escalation_history(self, limit: int = 100) -> List[Dict[str, Any]]:
return self.escalation_history[-limit:] if self.escalation_history else []
def reset_auto_edit_lockout(self):
self.auto_edit_lockout = False
def add_custom_constraint(self, constraint: SafetyConstraint):
"""Add a custom safety constraint"""
self.constraints[constraint.constraint_id] = constraint