Compare commits

..

1 Commits

Author SHA1 Message Date
ي
fb75377d37 Add OAuth social proxy callback binding and reconnect handling 2026-05-18 15:57:22 +05:30
2 changed files with 183 additions and 108 deletions

View File

@@ -0,0 +1,182 @@
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Optional
from urllib.parse import urlencode
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import RedirectResponse
from loguru import logger
from sqlalchemy import text
from sqlalchemy.orm import Session
from services.database import get_db
router = APIRouter(prefix="/v1/social-proxy", tags=["social-proxy"])
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _ensure_tables(db: Session) -> None:
# Keep this router backward-compatible on tenant DBs without migrations.
db.execute(text("""
CREATE TABLE IF NOT EXISTS oauth_nonce_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
state TEXT NOT NULL UNIQUE,
nonce TEXT NOT NULL,
user_id TEXT NOT NULL,
platform TEXT NOT NULL,
channel_id INTEGER,
consumed_at TEXT,
expires_at TEXT,
created_at TEXT NOT NULL
)
"""))
db.execute(text("""
CREATE TABLE IF NOT EXISTS social_channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
platform TEXT NOT NULL,
platform_account_id TEXT NOT NULL,
token_bundle TEXT NOT NULL,
token_version INTEGER NOT NULL DEFAULT 1,
publication_linkage TEXT,
is_connected INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(platform, platform_account_id)
)
"""))
def _build_redirect(base_url: str, code: str, message: str, channel_id: Optional[int] = None) -> RedirectResponse:
params = {"code": code, "message": message}
if channel_id is not None:
params["channel_id"] = str(channel_id)
return RedirectResponse(url=f"{base_url}?{urlencode(params)}", status_code=303)
@router.get("/oauth/callback")
def oauth_callback(
state: str = Query(...),
platform: str = Query(...),
account_id: str = Query(...),
token_bundle: str = Query(..., description="Serialized token payload"),
ui_redirect: str = Query("/dashboard/connections"),
db: Session = Depends(get_db),
):
"""Consume OAuth callback, bind to user/platform, and upsert social channel connection."""
_ensure_tables(db)
record = db.execute(
text("""
SELECT id, nonce, user_id, platform, channel_id, consumed_at, expires_at
FROM oauth_nonce_sessions WHERE state = :state
"""),
{"state": state},
).mappings().first()
if not record:
return _build_redirect(ui_redirect, "invalid_state", "Missing OAuth session")
if record["consumed_at"] is not None:
return _build_redirect(ui_redirect, "state_reused", "OAuth state already consumed")
if record["platform"] != platform:
return _build_redirect(ui_redirect, "platform_mismatch", "Platform mismatch")
if record["expires_at"] and record["expires_at"] < _utc_now_iso():
return _build_redirect(ui_redirect, "state_expired", "OAuth session expired")
user_id = record["user_id"]
# Validate token payload is JSON.
try:
parsed_bundle = json.loads(token_bundle)
except json.JSONDecodeError as exc:
raise HTTPException(status_code=400, detail="Invalid token_bundle JSON") from exc
now = _utc_now_iso()
existing = db.execute(
text("""
SELECT id, publication_linkage, token_version
FROM social_channels
WHERE platform = :platform AND platform_account_id = :account_id
"""),
{"platform": platform, "account_id": account_id},
).mappings().first()
if existing:
# Reconnect path: preserve publication linkage and bump token version.
db.execute(
text("""
UPDATE social_channels
SET user_id = :user_id,
token_bundle = :token_bundle,
token_version = :token_version,
is_connected = 1,
updated_at = :updated_at
WHERE id = :id
"""),
{
"id": existing["id"],
"user_id": user_id,
"token_bundle": json.dumps(parsed_bundle),
"token_version": int(existing["token_version"] or 0) + 1,
"updated_at": now,
},
)
channel_id = existing["id"]
result_code = "reconnected"
result_message = "Channel reconnected"
else:
db.execute(
text("""
INSERT INTO social_channels (
user_id, platform, platform_account_id, token_bundle,
token_version, publication_linkage, is_connected, created_at, updated_at
) VALUES (
:user_id, :platform, :account_id, :token_bundle,
1, :publication_linkage, 1, :created_at, :updated_at
)
"""),
{
"user_id": user_id,
"platform": platform,
"account_id": account_id,
"token_bundle": json.dumps(parsed_bundle),
"publication_linkage": None,
"created_at": now,
"updated_at": now,
},
)
channel_id = db.execute(text("SELECT last_insert_rowid()")).scalar_one()
result_code = "connected"
result_message = "Channel connected"
# Bind callback session to concrete channel/user/platform and mark consumed.
db.execute(
text("""
UPDATE oauth_nonce_sessions
SET consumed_at = :consumed_at,
channel_id = :channel_id,
user_id = :user_id,
platform = :platform
WHERE id = :id
"""),
{
"id": record["id"],
"consumed_at": now,
"channel_id": channel_id,
"user_id": user_id,
"platform": platform,
},
)
db.commit()
logger.info(f"OAuth callback complete user={user_id} platform={platform} channel_id={channel_id}")
return _build_redirect(ui_redirect, result_code, result_message, channel_id)

View File

@@ -101,7 +101,6 @@ class AgentContextVFS:
"/steps/integrations": AgentFlatContextStore.STEP5_FILENAME,
}
HIGH_SIGNAL_MARKERS = ("agent_summary", "high_signal_terms", "quick_facts", "context_type")
LOW_CONFIDENCE_MARKER = "low_confidence"
def __init__(self, user_id: str, project_id: Optional[str] = None):
self.user_id = user_id
@@ -295,101 +294,6 @@ class AgentContextVFS:
)
return ranked[: max(1, top_k)]
@staticmethod
def _mnemonic_token(result: Dict[str, Any], rank: int) -> str:
"""Create compressed mnemonic token with source reference."""
path = str(result.get("path") or "unknown")
reason = str(result.get("reason") or "match")
confidence = float(result.get("confidence") or 0.0)
low_flag = "!" if result.get(AgentContextVFS.LOW_CONFIDENCE_MARKER) else ""
src = path.replace(".json", "").replace("_", "-")[:28]
hint = reason.replace(" ", "-")[:20]
return f"M{rank}:{src}|{hint}|c{confidence:.2f}{low_flag}"
@staticmethod
def _detect_contradictions(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Detect contradictory learnings by path with conflicting reasons/relevance classes."""
by_path: Dict[str, List[Dict[str, Any]]] = {}
for item in results:
p = str(item.get("path") or "")
by_path.setdefault(p, []).append(item)
contradictions: List[Dict[str, Any]] = []
for path, rows in by_path.items():
reasons = {str(r.get("reason") or "").strip().lower() for r in rows}
relevance = {str(r.get("relevance") or "").strip().lower() for r in rows}
# contradictory if both high/supported or mixed summary/body signals in same source cluster
if len(reasons) > 1 and len(relevance) > 1:
contradictions.append(
{
"path": path,
"reason_variants": sorted([r for r in reasons if r]),
"relevance_variants": sorted([r for r in relevance if r]),
"count": len(rows),
}
)
return contradictions
def _run_synthesis_pipeline(
self, ranked_results: List[Dict[str, Any]], *, char_budget: int = 1200, top_k: int = 5
) -> Dict[str, Any]:
"""
Flat-context synthesis pipeline:
1) Compress telemetry into mnemonic tokens with source references
2) Detect contradictions and mark low-confidence heuristics
3) Select top-ranked, budget-fitting tokens for prompt injection
4) Persist synthesis + source lineage for explainability
"""
contradictions = self._detect_contradictions(ranked_results)
contradiction_paths = {c["path"] for c in contradictions}
normalized: List[Dict[str, Any]] = []
for idx, item in enumerate(ranked_results, start=1):
row = dict(item)
low_conf = bool(row.get("low_probability")) or (str(row.get("path") or "") in contradiction_paths)
row[self.LOW_CONFIDENCE_MARKER] = low_conf
if low_conf:
row["confidence"] = round(max(0.05, float(row.get("confidence", 0.0)) * 0.7), 3)
row["mnemonic_token"] = self._mnemonic_token(row, idx)
normalized.append(row)
chosen: List[Dict[str, Any]] = []
used = 0
for row in normalized[: max(1, top_k * 3)]:
token = str(row.get("mnemonic_token") or "")
cost = len(token) + 8
if chosen and used + cost > char_budget:
continue
chosen.append(row)
used += cost
if len(chosen) >= top_k:
break
synthesis = {
"created_at": datetime.now(timezone.utc).isoformat(),
"top_k": top_k,
"char_budget": char_budget,
"char_budget_used": used,
"selected_mnemonics": [c.get("mnemonic_token") for c in chosen],
"source_lineage": [
{
"mnemonic_token": c.get("mnemonic_token"),
"path": c.get("path"),
"reason": c.get("reason"),
"confidence": c.get("confidence"),
"low_confidence": c.get(self.LOW_CONFIDENCE_MARKER, False),
}
for c in chosen
],
"contradictions": contradictions,
}
self.append_activity_log(
event_type="flat_context_synthesis",
actor="agent_context_vfs",
details=synthesis,
)
return {"ranked_results": normalized, "synthesis": synthesis}
@staticmethod
def _resolve_json_path(data: Any, path_query: str) -> Any:
"""Resolve dot/bracket JSON path such as 'data.seo_audit.recommendations[0]'."""
@@ -614,26 +518,15 @@ class AgentContextVFS:
bounded_results.append(r)
used += cost
synthesis_bundle = self._run_synthesis_pipeline(
self._static_triage(bounded_results, normalized),
char_budget=1200,
top_k=5,
)
triaged_results = synthesis_bundle["ranked_results"]
synthesis = synthesis_bundle["synthesis"]
result = {
"query": normalized,
"attempted_queries": attempted_queries,
"matched_files_count": len(matched_files),
"results": triaged_results,
"results": self._static_triage(bounded_results, normalized),
"notice": notice,
"char_budget_used": used,
"can_answer": bool(bounded_results),
"synthesis": synthesis,
"prompt_context_mnemonics": synthesis.get("selected_mnemonics", []),
}
# Top-ranked, budget-fitting mnemonic tokens are the only ones intended for prompt context injection.
result["triage_top5"] = self._llm_router_stub(result["results"], top_k=5)
logger.info(
f"[vfs_audit] user={self.store.safe_user_id} action=search_context query={normalized!r} results={len(result['results'])}"