Compare commits

..

1 Commits

Author SHA1 Message Date
ي
6cef7c7257 Add capability-matrix checks and social action fallbacks 2026-05-18 16:02:08 +05:30
3 changed files with 114 additions and 182 deletions

View File

@@ -1,182 +0,0 @@
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Optional
from urllib.parse import urlencode
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import RedirectResponse
from loguru import logger
from sqlalchemy import text
from sqlalchemy.orm import Session
from services.database import get_db
router = APIRouter(prefix="/v1/social-proxy", tags=["social-proxy"])
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _ensure_tables(db: Session) -> None:
# Keep this router backward-compatible on tenant DBs without migrations.
db.execute(text("""
CREATE TABLE IF NOT EXISTS oauth_nonce_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
state TEXT NOT NULL UNIQUE,
nonce TEXT NOT NULL,
user_id TEXT NOT NULL,
platform TEXT NOT NULL,
channel_id INTEGER,
consumed_at TEXT,
expires_at TEXT,
created_at TEXT NOT NULL
)
"""))
db.execute(text("""
CREATE TABLE IF NOT EXISTS social_channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
platform TEXT NOT NULL,
platform_account_id TEXT NOT NULL,
token_bundle TEXT NOT NULL,
token_version INTEGER NOT NULL DEFAULT 1,
publication_linkage TEXT,
is_connected INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(platform, platform_account_id)
)
"""))
def _build_redirect(base_url: str, code: str, message: str, channel_id: Optional[int] = None) -> RedirectResponse:
params = {"code": code, "message": message}
if channel_id is not None:
params["channel_id"] = str(channel_id)
return RedirectResponse(url=f"{base_url}?{urlencode(params)}", status_code=303)
@router.get("/oauth/callback")
def oauth_callback(
state: str = Query(...),
platform: str = Query(...),
account_id: str = Query(...),
token_bundle: str = Query(..., description="Serialized token payload"),
ui_redirect: str = Query("/dashboard/connections"),
db: Session = Depends(get_db),
):
"""Consume OAuth callback, bind to user/platform, and upsert social channel connection."""
_ensure_tables(db)
record = db.execute(
text("""
SELECT id, nonce, user_id, platform, channel_id, consumed_at, expires_at
FROM oauth_nonce_sessions WHERE state = :state
"""),
{"state": state},
).mappings().first()
if not record:
return _build_redirect(ui_redirect, "invalid_state", "Missing OAuth session")
if record["consumed_at"] is not None:
return _build_redirect(ui_redirect, "state_reused", "OAuth state already consumed")
if record["platform"] != platform:
return _build_redirect(ui_redirect, "platform_mismatch", "Platform mismatch")
if record["expires_at"] and record["expires_at"] < _utc_now_iso():
return _build_redirect(ui_redirect, "state_expired", "OAuth session expired")
user_id = record["user_id"]
# Validate token payload is JSON.
try:
parsed_bundle = json.loads(token_bundle)
except json.JSONDecodeError as exc:
raise HTTPException(status_code=400, detail="Invalid token_bundle JSON") from exc
now = _utc_now_iso()
existing = db.execute(
text("""
SELECT id, publication_linkage, token_version
FROM social_channels
WHERE platform = :platform AND platform_account_id = :account_id
"""),
{"platform": platform, "account_id": account_id},
).mappings().first()
if existing:
# Reconnect path: preserve publication linkage and bump token version.
db.execute(
text("""
UPDATE social_channels
SET user_id = :user_id,
token_bundle = :token_bundle,
token_version = :token_version,
is_connected = 1,
updated_at = :updated_at
WHERE id = :id
"""),
{
"id": existing["id"],
"user_id": user_id,
"token_bundle": json.dumps(parsed_bundle),
"token_version": int(existing["token_version"] or 0) + 1,
"updated_at": now,
},
)
channel_id = existing["id"]
result_code = "reconnected"
result_message = "Channel reconnected"
else:
db.execute(
text("""
INSERT INTO social_channels (
user_id, platform, platform_account_id, token_bundle,
token_version, publication_linkage, is_connected, created_at, updated_at
) VALUES (
:user_id, :platform, :account_id, :token_bundle,
1, :publication_linkage, 1, :created_at, :updated_at
)
"""),
{
"user_id": user_id,
"platform": platform,
"account_id": account_id,
"token_bundle": json.dumps(parsed_bundle),
"publication_linkage": None,
"created_at": now,
"updated_at": now,
},
)
channel_id = db.execute(text("SELECT last_insert_rowid()")).scalar_one()
result_code = "connected"
result_message = "Channel connected"
# Bind callback session to concrete channel/user/platform and mark consumed.
db.execute(
text("""
UPDATE oauth_nonce_sessions
SET consumed_at = :consumed_at,
channel_id = :channel_id,
user_id = :user_id,
platform = :platform
WHERE id = :id
"""),
{
"id": record["id"],
"consumed_at": now,
"channel_id": channel_id,
"user_id": user_id,
"platform": platform,
},
)
db.commit()
logger.info(f"OAuth callback complete user={user_id} platform={platform} channel_id={channel_id}")
return _build_redirect(ui_redirect, result_code, result_message, channel_id)

View File

@@ -697,6 +697,39 @@ class BaseALwrityAgent(ABC):
"action_id": action.action_id,
"agent_id": self.agent_id,
}
capability_decision = self._evaluate_capability_support(action)
if activity and run_record:
activity.log_event(
event_type="decision",
severity="info" if capability_decision.get("supported", False) else "warning",
message=capability_decision.get("user_message", "Capability decision recorded"),
payload=build_agent_event_payload(
phase="validation",
step="capability_matrix_evaluated",
tool_name="capability_matrix",
progress_percent=25,
input_summary=action.action_type,
output_summary="Supported action" if capability_decision.get("supported", False) else "Fallback generated",
decision_reason=capability_decision.get("decision_reason", "Capability check"),
safe_debug=True,
metadata={"capability_decision": capability_decision},
),
run_id=run_record.id,
agent_type=self.agent_type,
)
if not capability_decision.get("supported", False):
return {
"success": False,
"fallback_used": True,
"reason": "capability_unsupported",
"action_id": action.action_id,
"agent_id": self.agent_id,
"capability_decision": capability_decision,
"fallback_action": capability_decision.get("fallback_action"),
"user_message": capability_decision.get("user_message"),
}
# 2. Create rollback checkpoint
try:
@@ -912,6 +945,83 @@ class BaseALwrityAgent(ABC):
Please execute this action and provide a detailed response.
Consider user goals, safety constraints, and potential impacts.
"""
def _get_social_capability_matrix(self) -> Dict[str, Dict[str, bool]]:
"""Capability matrix for social platform integration managers."""
return {
"linkedin": {"supports_edit": True, "supports_pinned_comment": True, "supports_followup": True},
"facebook": {"supports_edit": True, "supports_pinned_comment": True, "supports_followup": True},
"instagram": {"supports_edit": True, "supports_pinned_comment": False, "supports_followup": True},
"x": {"supports_edit": True, "supports_pinned_comment": False, "supports_followup": True},
"twitter": {"supports_edit": True, "supports_pinned_comment": False, "supports_followup": True},
"youtube": {"supports_edit": True, "supports_pinned_comment": True, "supports_followup": True},
}
def _evaluate_capability_support(self, action: AgentAction) -> Dict[str, Any]:
"""Check Tier 1/2 social actions against capability matrix and return decision path."""
platform = str(action.parameters.get("platform", "")).strip().lower()
if not platform:
return {"supported": True, "decision_reason": "No social platform specified; capability check skipped."}
matrix = self._get_social_capability_matrix()
platform_caps = matrix.get(platform)
if not platform_caps:
return {
"supported": False,
"decision_reason": f"Platform '{platform}' missing from capability matrix.",
"fallback_action": self._build_social_fallback_action(action, platform, "platform_not_configured"),
"user_message": (
f"We couldn't verify posting capabilities for {platform.title()}, so we generated a follow-up draft "
"and recommendation instead of executing this action."
),
}
action_tier = str(action.parameters.get("action_tier", "")).strip().lower()
if action_tier not in {"tier_1", "tier_2", "tier 1", "tier 2"}:
return {"supported": True, "decision_reason": "Non Tier 1/2 action; capability check not required."}
action_type = action.action_type.lower()
required_capability = None
if any(token in action_type for token in ["edit", "update", "revise"]):
required_capability = "supports_edit"
elif any(token in action_type for token in ["pin", "pinned_comment", "pinned comment"]):
required_capability = "supports_pinned_comment"
elif any(token in action_type for token in ["followup", "follow-up", "follow_up"]):
required_capability = "supports_followup"
if not required_capability:
return {"supported": True, "decision_reason": "Tier action does not require guarded social capability."}
supported = bool(platform_caps.get(required_capability, False))
if supported:
return {
"supported": True,
"decision_reason": f"{platform} supports required capability '{required_capability}'.",
"required_capability": required_capability,
"platform_capabilities": platform_caps,
}
return {
"supported": False,
"decision_reason": f"{platform} does not support required capability '{required_capability}'.",
"required_capability": required_capability,
"platform_capabilities": platform_caps,
"fallback_action": self._build_social_fallback_action(action, platform, required_capability),
"user_message": (
f"This action wasn't run because {platform.title()} does not support {required_capability}. "
"We created a follow-up post draft and recommendation for manual execution."
),
}
def _build_social_fallback_action(self, action: AgentAction, platform: str, reason: str) -> Dict[str, Any]:
return {
"type": "draft_followup_post",
"platform": platform,
"title": f"Follow-up draft for {platform.title()}",
"draft": f"Follow-up for original action '{action.action_type}' on {action.target_resource}.",
"recommendation": "Review and publish manually, then notify the team.",
"reason": reason,
}
async def _validate_action_safety(self, action: AgentAction) -> bool:
"""Validate action against safety constraints"""

View File

@@ -69,6 +69,10 @@ class SocialAmplificationAgent(BaseALwrityAgent):
# Instruction will be provided via orchestrator context or initial prompt
# Instruction should be provided during invocation or via orchestrator context
)
def get_social_integration_capabilities(self) -> Dict[str, Dict[str, bool]]:
"""Expose platform capability flags used by social integration managers."""
return self._get_social_capability_matrix()
# Tool Implementations