Compare commits

..

1 Commits

Author SHA1 Message Date
ي
fb75377d37 Add OAuth social proxy callback binding and reconnect handling 2026-05-18 15:57:22 +05:30
3 changed files with 187 additions and 155 deletions

View File

@@ -0,0 +1,182 @@
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Optional
from urllib.parse import urlencode
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import RedirectResponse
from loguru import logger
from sqlalchemy import text
from sqlalchemy.orm import Session
from services.database import get_db
router = APIRouter(prefix="/v1/social-proxy", tags=["social-proxy"])
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _ensure_tables(db: Session) -> None:
# Keep this router backward-compatible on tenant DBs without migrations.
db.execute(text("""
CREATE TABLE IF NOT EXISTS oauth_nonce_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
state TEXT NOT NULL UNIQUE,
nonce TEXT NOT NULL,
user_id TEXT NOT NULL,
platform TEXT NOT NULL,
channel_id INTEGER,
consumed_at TEXT,
expires_at TEXT,
created_at TEXT NOT NULL
)
"""))
db.execute(text("""
CREATE TABLE IF NOT EXISTS social_channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
platform TEXT NOT NULL,
platform_account_id TEXT NOT NULL,
token_bundle TEXT NOT NULL,
token_version INTEGER NOT NULL DEFAULT 1,
publication_linkage TEXT,
is_connected INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(platform, platform_account_id)
)
"""))
def _build_redirect(base_url: str, code: str, message: str, channel_id: Optional[int] = None) -> RedirectResponse:
params = {"code": code, "message": message}
if channel_id is not None:
params["channel_id"] = str(channel_id)
return RedirectResponse(url=f"{base_url}?{urlencode(params)}", status_code=303)
@router.get("/oauth/callback")
def oauth_callback(
state: str = Query(...),
platform: str = Query(...),
account_id: str = Query(...),
token_bundle: str = Query(..., description="Serialized token payload"),
ui_redirect: str = Query("/dashboard/connections"),
db: Session = Depends(get_db),
):
"""Consume OAuth callback, bind to user/platform, and upsert social channel connection."""
_ensure_tables(db)
record = db.execute(
text("""
SELECT id, nonce, user_id, platform, channel_id, consumed_at, expires_at
FROM oauth_nonce_sessions WHERE state = :state
"""),
{"state": state},
).mappings().first()
if not record:
return _build_redirect(ui_redirect, "invalid_state", "Missing OAuth session")
if record["consumed_at"] is not None:
return _build_redirect(ui_redirect, "state_reused", "OAuth state already consumed")
if record["platform"] != platform:
return _build_redirect(ui_redirect, "platform_mismatch", "Platform mismatch")
if record["expires_at"] and record["expires_at"] < _utc_now_iso():
return _build_redirect(ui_redirect, "state_expired", "OAuth session expired")
user_id = record["user_id"]
# Validate token payload is JSON.
try:
parsed_bundle = json.loads(token_bundle)
except json.JSONDecodeError as exc:
raise HTTPException(status_code=400, detail="Invalid token_bundle JSON") from exc
now = _utc_now_iso()
existing = db.execute(
text("""
SELECT id, publication_linkage, token_version
FROM social_channels
WHERE platform = :platform AND platform_account_id = :account_id
"""),
{"platform": platform, "account_id": account_id},
).mappings().first()
if existing:
# Reconnect path: preserve publication linkage and bump token version.
db.execute(
text("""
UPDATE social_channels
SET user_id = :user_id,
token_bundle = :token_bundle,
token_version = :token_version,
is_connected = 1,
updated_at = :updated_at
WHERE id = :id
"""),
{
"id": existing["id"],
"user_id": user_id,
"token_bundle": json.dumps(parsed_bundle),
"token_version": int(existing["token_version"] or 0) + 1,
"updated_at": now,
},
)
channel_id = existing["id"]
result_code = "reconnected"
result_message = "Channel reconnected"
else:
db.execute(
text("""
INSERT INTO social_channels (
user_id, platform, platform_account_id, token_bundle,
token_version, publication_linkage, is_connected, created_at, updated_at
) VALUES (
:user_id, :platform, :account_id, :token_bundle,
1, :publication_linkage, 1, :created_at, :updated_at
)
"""),
{
"user_id": user_id,
"platform": platform,
"account_id": account_id,
"token_bundle": json.dumps(parsed_bundle),
"publication_linkage": None,
"created_at": now,
"updated_at": now,
},
)
channel_id = db.execute(text("SELECT last_insert_rowid()")).scalar_one()
result_code = "connected"
result_message = "Channel connected"
# Bind callback session to concrete channel/user/platform and mark consumed.
db.execute(
text("""
UPDATE oauth_nonce_sessions
SET consumed_at = :consumed_at,
channel_id = :channel_id,
user_id = :user_id,
platform = :platform
WHERE id = :id
"""),
{
"id": record["id"],
"consumed_at": now,
"channel_id": channel_id,
"user_id": user_id,
"platform": platform,
},
)
db.commit()
logger.info(f"OAuth callback complete user={user_id} platform={platform} channel_id={channel_id}")
return _build_redirect(ui_redirect, result_code, result_message, channel_id)

View File

@@ -99,17 +99,6 @@ class OptimizationRecommendation:
expires = datetime.utcnow().timestamp() + (7 * 24 * 60 * 60)
self.expires_at = datetime.fromtimestamp(expires).isoformat()
@dataclass
class TierPolicyConfig:
"""Structured policy for anomaly tiers and remediation controls"""
tier: int
trigger_metrics: List[str]
thresholds: Dict[str, float]
max_iterations: int
lock_criteria: Dict[str, Any]
class AgentPerformanceMonitor:
"""Main performance monitoring system for agents"""
@@ -119,32 +108,6 @@ class AgentPerformanceMonitor:
self.agent_snapshots: Dict[str, AgentPerformanceSnapshot] = {}
self.recommendations: List[OptimizationRecommendation] = []
self.performance_history: deque = deque(maxlen=1000) # Keep last 1000 data points
self.systemic_alerts: List[Dict[str, Any]] = []
# Structured tier policy config
self.tier_policy_config: Dict[int, TierPolicyConfig] = {
1: TierPolicyConfig(
tier=1,
trigger_metrics=["success_rate", "efficiency_score", "response_time"],
thresholds={"success_rate": 0.80, "efficiency_score": 0.65, "response_time": 45.0},
max_iterations=3,
lock_criteria={"min_confidence": 0.85, "consecutive_failures": 6}
),
2: TierPolicyConfig(
tier=2,
trigger_metrics=["success_rate", "efficiency_score", "response_time", "market_impact"],
thresholds={"success_rate": 0.70, "efficiency_score": 0.50, "response_time": 60.0, "market_impact": 0.35},
max_iterations=2,
lock_criteria={"min_confidence": 0.75, "consecutive_failures": 4}
),
3: TierPolicyConfig(
tier=3,
trigger_metrics=["success_rate", "efficiency_score", "response_time", "market_impact"],
thresholds={"success_rate": 0.55, "efficiency_score": 0.35, "response_time": 90.0, "market_impact": 0.25},
max_iterations=1,
lock_criteria={"min_confidence": 0.65, "consecutive_failures": 3}
)
}
# Performance thresholds and targets
self.performance_targets = {
@@ -550,54 +513,6 @@ class AgentPerformanceMonitor:
}
return priority_weights.get(priority, 0)
def _build_recommended_action_payload(self, agent_id: str, snapshot: AgentPerformanceSnapshot) -> Dict[str, Any]:
"""Build recommended action payload including tier and confidence."""
tier = 1
if (snapshot.success_rate <= self.tier_policy_config[3].thresholds["success_rate"] or
snapshot.efficiency_score <= self.tier_policy_config[3].thresholds["efficiency_score"] or
snapshot.average_response_time >= self.tier_policy_config[3].thresholds["response_time"] or
snapshot.market_impact_score <= self.tier_policy_config[3].thresholds["market_impact"]):
tier = 3
elif (snapshot.success_rate <= self.tier_policy_config[2].thresholds["success_rate"] or
snapshot.efficiency_score <= self.tier_policy_config[2].thresholds["efficiency_score"] or
snapshot.average_response_time >= self.tier_policy_config[2].thresholds["response_time"] or
snapshot.market_impact_score <= self.tier_policy_config[2].thresholds["market_impact"]):
tier = 2
confidence = round(max(0.0, min(1.0, 1.0 - abs(0.75 - self._calculate_health_score(snapshot)))) , 2)
policy = self.tier_policy_config[tier]
return {
"agent_id": agent_id,
"tier": tier,
"confidence": confidence,
"max_iterations": policy.max_iterations,
"lock_criteria": policy.lock_criteria,
"trigger_metrics": policy.trigger_metrics
}
def _route_tier3_systemic_alert(self, action_payload: Dict[str, Any], alerts: List[Dict[str, Any]]) -> None:
"""Route Tier 3 systemic anomalies to alerting subsystem with diagnostic brief."""
diagnostic_brief = {
"type": "systemic_anomaly",
"severity": "critical",
"tier": 3,
"confidence": action_payload.get("confidence", 0.0),
"agent_id": action_payload.get("agent_id"),
"timestamp": datetime.utcnow().isoformat(),
"diagnostic_brief": {
"trigger_metrics": action_payload.get("trigger_metrics", []),
"alerts": alerts,
"max_iterations": action_payload.get("max_iterations"),
"lock_criteria": action_payload.get("lock_criteria", {})
}
}
self.systemic_alerts.append(diagnostic_brief)
if len(self.systemic_alerts) > 200:
self.systemic_alerts = self.systemic_alerts[-200:]
logger.critical(f"[ALERTING_SUBSYSTEM] Tier 3 systemic anomaly routed: {json.dumps(diagnostic_brief)}")
async def get_performance_alerts(self, agent_id: str) -> List[Dict[str, Any]]:
"""Get performance alerts for an agent"""
alerts = []
@@ -659,13 +574,6 @@ class AgentPerformanceMonitor:
"timestamp": datetime.utcnow().isoformat()
})
action_payload = self._build_recommended_action_payload(agent_id, snapshot)
if action_payload["tier"] == 3:
self._route_tier3_systemic_alert(action_payload, alerts)
for alert in alerts:
alert["recommended_action"] = action_payload
return alerts
except Exception as e:

View File

@@ -84,17 +84,6 @@ class SafetyValidation:
if self.validation_timestamp is None:
self.validation_timestamp = datetime.utcnow().isoformat()
@dataclass
class SafetyArbitrationDecision:
"""Explicit allow/deny/lock decision with reasons."""
decision: str
reasons: List[str]
tier: int
confidence: float
lock_state_active: bool
class SafetyConstraintManager:
"""Manages safety constraints for agent actions"""
@@ -103,8 +92,6 @@ class SafetyConstraintManager:
self.constraints: Dict[str, SafetyConstraint] = {}
self.action_history: List[Dict[str, Any]] = []
self.violation_history: List[Dict[str, Any]] = []
self.lock_state_active: bool = False
self.lock_state_reason: Optional[str] = None
# Initialize default constraints
self._initialize_default_constraints()
@@ -176,17 +163,6 @@ class SafetyConstraintManager:
"""Validate an action against safety constraints"""
try:
logger.info(f"Validating action for user {self.user_id}: {action_data.get('action_type', 'unknown')}")
if self.lock_state_active and action_data.get("autonomous_modification", True):
reason = self.lock_state_reason or "Safety lock is active due to Tier 3 systemic anomaly"
return SafetyValidation(
is_valid=False,
risk_level=RiskLevel.CRITICAL,
violations=["Autonomous modifications blocked while lock state is active"],
recommendations=[reason],
requires_approval=True,
confidence_score=1.0
)
violations = []
recommendations = []
@@ -231,29 +207,19 @@ class SafetyConstraintManager:
# Final validation
is_valid = len(violations) == 0 and not requires_approval
confidence_score = max(0.0, min(1.0, confidence_score))
arbitration = self._arbitrate_decision(action_data, risk_level, violations, requires_approval, confidence_score)
if arbitration.decision == "lock":
self.lock_state_active = True
self.lock_state_reason = "; ".join(arbitration.reasons)
is_valid = False
requires_approval = True
recommendations.extend([f"Arbitration decision: {arbitration.decision}", *arbitration.reasons])
logger.info(f"Action validation completed for user {self.user_id}. Decision: {arbitration.decision}, Valid: {is_valid}, Risk: {risk_level.value}, Violations: {len(violations)}")
logger.info(f"Action validation completed for user {self.user_id}. Valid: {is_valid}, Risk: {risk_level.value}, Violations: {len(violations)}")
# Record in history
await self._record_validation_history(action_data, is_valid, violations)
return SafetyValidation(
is_valid=is_valid,
risk_level=risk_level,
violations=violations,
recommendations=recommendations,
requires_approval=requires_approval,
confidence_score=confidence_score
confidence_score=max(0.0, min(1.0, confidence_score))
)
except Exception as e:
@@ -269,30 +235,6 @@ class SafetyConstraintManager:
confidence_score=0.0
)
def _arbitrate_decision(self, action_data: Dict[str, Any], risk_level: RiskLevel, violations: List[str], requires_approval: bool, confidence_score: float) -> SafetyArbitrationDecision:
"""Arbitrate allow/deny/lock with explicit reasons."""
reasons: List[str] = []
tier = int(action_data.get("recommended_tier", 1))
if self.lock_state_active:
reasons.append("Existing lock state is active")
return SafetyArbitrationDecision("lock", reasons, tier, confidence_score, True)
if tier >= 3 or risk_level == RiskLevel.CRITICAL:
reasons.append("Tier 3 systemic anomaly or critical risk detected")
if violations:
reasons.extend(violations)
return SafetyArbitrationDecision("lock", reasons, 3, confidence_score, True)
if violations or requires_approval:
reasons.append("Safety policy violation or approval requirement triggered")
reasons.extend(violations)
return SafetyArbitrationDecision("deny", reasons, tier, confidence_score, False)
reasons.append("No policy violations detected")
return SafetyArbitrationDecision("allow", reasons, tier, confidence_score, False)
def _determine_action_category(self, action_type: str) -> ActionCategory:
"""Determine the category of an action"""
action_type_lower = action_type.lower()