Compare commits

..

1 Commits

Author SHA1 Message Date
ي
6cef7c7257 Add capability-matrix checks and social action fallbacks 2026-05-18 16:02:08 +05:30
6 changed files with 125 additions and 86 deletions

View File

@@ -40,10 +40,6 @@ class OAuthTokenMonitoringTask(Base):
# Scheduling # Scheduling
next_check = Column(DateTime, nullable=True, index=True) # Next scheduled check time next_check = Column(DateTime, nullable=True, index=True) # Next scheduled check time
next_retry_at = Column(DateTime, nullable=True, index=True) # Backoff retry schedule for refresh failures
refresh_attempts = Column(Integer, default=0) # Current retry attempt count for refresh workflow
terminal_failure_reason = Column(Text, nullable=True) # Permanent failure reason requiring user action
channel_status = Column(String(32), default='connected') # connected, degraded, disconnected
# Metadata # Metadata
created_at = Column(DateTime, default=datetime.utcnow) created_at = Column(DateTime, default=datetime.utcnow)
@@ -101,3 +97,4 @@ class OAuthTokenExecutionLog(Base):
def __repr__(self): def __repr__(self):
return f"<OAuthTokenExecutionLog(id={self.id}, task_id={self.task_id}, status={self.status}, execution_date={self.execution_date})>" return f"<OAuthTokenExecutionLog(id={self.id}, task_id={self.task_id}, status={self.status}, execution_date={self.execution_date})>"

View File

@@ -698,6 +698,39 @@ class BaseALwrityAgent(ABC):
"agent_id": self.agent_id, "agent_id": self.agent_id,
} }
capability_decision = self._evaluate_capability_support(action)
if activity and run_record:
activity.log_event(
event_type="decision",
severity="info" if capability_decision.get("supported", False) else "warning",
message=capability_decision.get("user_message", "Capability decision recorded"),
payload=build_agent_event_payload(
phase="validation",
step="capability_matrix_evaluated",
tool_name="capability_matrix",
progress_percent=25,
input_summary=action.action_type,
output_summary="Supported action" if capability_decision.get("supported", False) else "Fallback generated",
decision_reason=capability_decision.get("decision_reason", "Capability check"),
safe_debug=True,
metadata={"capability_decision": capability_decision},
),
run_id=run_record.id,
agent_type=self.agent_type,
)
if not capability_decision.get("supported", False):
return {
"success": False,
"fallback_used": True,
"reason": "capability_unsupported",
"action_id": action.action_id,
"agent_id": self.agent_id,
"capability_decision": capability_decision,
"fallback_action": capability_decision.get("fallback_action"),
"user_message": capability_decision.get("user_message"),
}
# 2. Create rollback checkpoint # 2. Create rollback checkpoint
try: try:
# Capture current system state # Capture current system state
@@ -913,6 +946,83 @@ class BaseALwrityAgent(ABC):
Consider user goals, safety constraints, and potential impacts. Consider user goals, safety constraints, and potential impacts.
""" """
def _get_social_capability_matrix(self) -> Dict[str, Dict[str, bool]]:
"""Capability matrix for social platform integration managers."""
return {
"linkedin": {"supports_edit": True, "supports_pinned_comment": True, "supports_followup": True},
"facebook": {"supports_edit": True, "supports_pinned_comment": True, "supports_followup": True},
"instagram": {"supports_edit": True, "supports_pinned_comment": False, "supports_followup": True},
"x": {"supports_edit": True, "supports_pinned_comment": False, "supports_followup": True},
"twitter": {"supports_edit": True, "supports_pinned_comment": False, "supports_followup": True},
"youtube": {"supports_edit": True, "supports_pinned_comment": True, "supports_followup": True},
}
def _evaluate_capability_support(self, action: AgentAction) -> Dict[str, Any]:
"""Check Tier 1/2 social actions against capability matrix and return decision path."""
platform = str(action.parameters.get("platform", "")).strip().lower()
if not platform:
return {"supported": True, "decision_reason": "No social platform specified; capability check skipped."}
matrix = self._get_social_capability_matrix()
platform_caps = matrix.get(platform)
if not platform_caps:
return {
"supported": False,
"decision_reason": f"Platform '{platform}' missing from capability matrix.",
"fallback_action": self._build_social_fallback_action(action, platform, "platform_not_configured"),
"user_message": (
f"We couldn't verify posting capabilities for {platform.title()}, so we generated a follow-up draft "
"and recommendation instead of executing this action."
),
}
action_tier = str(action.parameters.get("action_tier", "")).strip().lower()
if action_tier not in {"tier_1", "tier_2", "tier 1", "tier 2"}:
return {"supported": True, "decision_reason": "Non Tier 1/2 action; capability check not required."}
action_type = action.action_type.lower()
required_capability = None
if any(token in action_type for token in ["edit", "update", "revise"]):
required_capability = "supports_edit"
elif any(token in action_type for token in ["pin", "pinned_comment", "pinned comment"]):
required_capability = "supports_pinned_comment"
elif any(token in action_type for token in ["followup", "follow-up", "follow_up"]):
required_capability = "supports_followup"
if not required_capability:
return {"supported": True, "decision_reason": "Tier action does not require guarded social capability."}
supported = bool(platform_caps.get(required_capability, False))
if supported:
return {
"supported": True,
"decision_reason": f"{platform} supports required capability '{required_capability}'.",
"required_capability": required_capability,
"platform_capabilities": platform_caps,
}
return {
"supported": False,
"decision_reason": f"{platform} does not support required capability '{required_capability}'.",
"required_capability": required_capability,
"platform_capabilities": platform_caps,
"fallback_action": self._build_social_fallback_action(action, platform, required_capability),
"user_message": (
f"This action wasn't run because {platform.title()} does not support {required_capability}. "
"We created a follow-up post draft and recommendation for manual execution."
),
}
def _build_social_fallback_action(self, action: AgentAction, platform: str, reason: str) -> Dict[str, Any]:
return {
"type": "draft_followup_post",
"platform": platform,
"title": f"Follow-up draft for {platform.title()}",
"draft": f"Follow-up for original action '{action.action_type}' on {action.target_resource}.",
"recommendation": "Review and publish manually, then notify the team.",
"reason": reason,
}
async def _validate_action_safety(self, action: AgentAction) -> bool: async def _validate_action_safety(self, action: AgentAction) -> bool:
"""Validate action against safety constraints""" """Validate action against safety constraints"""
try: try:

View File

@@ -70,6 +70,10 @@ class SocialAmplificationAgent(BaseALwrityAgent):
# Instruction should be provided during invocation or via orchestrator context # Instruction should be provided during invocation or via orchestrator context
) )
def get_social_integration_capabilities(self) -> Dict[str, Dict[str, bool]]:
"""Expose platform capability flags used by social integration managers."""
return self._get_social_capability_matrix()
# Tool Implementations # Tool Implementations
def _social_monitor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: def _social_monitor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]:

View File

@@ -26,10 +26,7 @@ from .executors.advertools_executor import AdvertoolsExecutor
from .executors.sif_indexing_executor import SIFIndexingExecutor from .executors.sif_indexing_executor import SIFIndexingExecutor
from .executors.market_trends_executor import MarketTrendsExecutor from .executors.market_trends_executor import MarketTrendsExecutor
from .utils.task_loader import load_due_monitoring_tasks from .utils.task_loader import load_due_monitoring_tasks
from .utils.oauth_token_task_loader import ( from .utils.oauth_token_task_loader import load_due_oauth_token_monitoring_tasks
load_due_oauth_token_monitoring_tasks,
load_near_expiry_oauth_token_tasks
)
from .utils.website_analysis_task_loader import load_due_website_analysis_tasks from .utils.website_analysis_task_loader import load_due_website_analysis_tasks
from .utils.onboarding_full_website_analysis_task_loader import load_due_onboarding_full_website_analysis_tasks from .utils.onboarding_full_website_analysis_task_loader import load_due_onboarding_full_website_analysis_tasks
from .utils.deep_competitor_analysis_task_loader import load_due_deep_competitor_analysis_tasks from .utils.deep_competitor_analysis_task_loader import load_due_deep_competitor_analysis_tasks
@@ -73,11 +70,6 @@ def get_scheduler() -> TaskScheduler:
oauth_token_executor, oauth_token_executor,
load_due_oauth_token_monitoring_tasks load_due_oauth_token_monitoring_tasks
) )
_scheduler_instance.register_executor(
'oauth_token_refresh',
oauth_token_executor,
load_near_expiry_oauth_token_tasks
)
# Register website analysis executor # Register website analysis executor
website_analysis_executor = WebsiteAnalysisExecutor() website_analysis_executor = WebsiteAnalysisExecutor()

View File

@@ -42,8 +42,6 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
self.exception_handler = SchedulerExceptionHandler() self.exception_handler = SchedulerExceptionHandler()
# Expiration warning window (7 days before expiration) # Expiration warning window (7 days before expiration)
self.expiration_warning_days = 7 self.expiration_warning_days = 7
self.max_refresh_retries = 3
self.base_retry_backoff_minutes = 15
async def execute_task(self, task: OAuthTokenMonitoringTask, db: Session) -> TaskExecutionResult: async def execute_task(self, task: OAuthTokenMonitoringTask, db: Session) -> TaskExecutionResult:
""" """
@@ -95,10 +93,6 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
task.last_success = datetime.utcnow() task.last_success = datetime.utcnow()
task.status = 'active' task.status = 'active'
task.failure_reason = None task.failure_reason = None
task.terminal_failure_reason = None
task.channel_status = 'connected'
task.refresh_attempts = 0
task.next_retry_at = None
# Reset failure tracking on success # Reset failure tracking on success
task.consecutive_failures = 0 task.consecutive_failures = 0
task.failure_pattern = None task.failure_pattern = None
@@ -118,7 +112,6 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
task.last_failure = datetime.utcnow() task.last_failure = datetime.utcnow()
task.failure_reason = result.error_message task.failure_reason = result.error_message
task.refresh_attempts = (task.refresh_attempts or 0) + 1
if pattern and pattern.should_cool_off: if pattern and pattern.should_cool_off:
# Mark task for human intervention # Mark task for human intervention
@@ -133,9 +126,6 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
} }
# Clear next_check - task won't run automatically # Clear next_check - task won't run automatically
task.next_check = None task.next_check = None
task.next_retry_at = None
task.channel_status = "disconnected"
task.terminal_failure_reason = result.error_message
self.logger.warning( self.logger.warning(
f"Task {task.id} marked for human intervention: " f"Task {task.id} marked for human intervention: "
@@ -143,17 +133,10 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
f"reason: {pattern.failure_reason.value}" f"reason: {pattern.failure_reason.value}"
) )
else: else:
# Normal failure handling
task.status = 'failed'
task.consecutive_failures = (task.consecutive_failures or 0) + 1 task.consecutive_failures = (task.consecutive_failures or 0) + 1
if task.refresh_attempts >= self.max_refresh_retries: # Do NOT update next_check - wait for manual trigger
task.status = 'failed'
task.channel_status = 'disconnected'
task.terminal_failure_reason = result.error_message
task.next_retry_at = None
else:
task.status = 'degraded'
task.channel_status = 'degraded'
delay_minutes = self.base_retry_backoff_minutes * (2 ** (task.refresh_attempts - 1))
task.next_retry_at = datetime.utcnow() + timedelta(minutes=delay_minutes)
self.logger.warning( self.logger.warning(
f"OAuth token refresh failed for user {user_id}, platform {platform}. " f"OAuth token refresh failed for user {user_id}, platform {platform}. "
@@ -161,7 +144,7 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
) )
# Create UsageAlert notification for the user # Create UsageAlert notification for the user
self._create_failure_alert(user_id, platform, result.error_message, result.result_data, db, task) self._create_failure_alert(user_id, platform, result.error_message, result.result_data, db)
task.updated_at = datetime.utcnow() task.updated_at = datetime.utcnow()
db.commit() db.commit()
@@ -210,14 +193,12 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
task.last_failure = datetime.utcnow() task.last_failure = datetime.utcnow()
task.failure_reason = str(e) task.failure_reason = str(e)
task.status = 'failed' task.status = 'failed'
task.channel_status = 'disconnected'
task.terminal_failure_reason = str(e)
task.last_check = datetime.utcnow() task.last_check = datetime.utcnow()
task.updated_at = datetime.utcnow() task.updated_at = datetime.utcnow()
task.next_retry_at = None # Do NOT update next_check - wait for manual trigger
# Create UsageAlert notification for the user # Create UsageAlert notification for the user
self._create_failure_alert(user_id, task.platform, str(e), None, db, task) self._create_failure_alert(user_id, task.platform, str(e), None, db)
db.commit() db.commit()
except Exception as commit_error: except Exception as commit_error:
@@ -670,8 +651,7 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
platform: str, platform: str,
error_message: str, error_message: str,
result_data: Optional[Dict[str, Any]], result_data: Optional[Dict[str, Any]],
db: Session, db: Session
task: Optional[OAuthTokenMonitoringTask] = None
): ):
""" """
Create a UsageAlert notification when OAuth token refresh fails. Create a UsageAlert notification when OAuth token refresh fails.
@@ -744,20 +724,6 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
from datetime import datetime from datetime import datetime
billing_period = datetime.utcnow().strftime("%Y-%m") billing_period = datetime.utcnow().strftime("%Y-%m")
alert_payload = {
"requires_user_action": True,
"platform": platform,
"channel_status": getattr(task, "channel_status", "disconnected"),
"terminal_failure_reason": getattr(task, "terminal_failure_reason", error_message),
"next_retry_at": (
task.next_retry_at.isoformat() if task and task.next_retry_at else None
),
"refresh_attempts": getattr(task, "refresh_attempts", 0),
"max_refresh_retries": self.max_refresh_retries,
}
message = f"{message} [ALERT_PAYLOAD] {alert_payload}"
# Create UsageAlert # Create UsageAlert
alert = UsageAlert( alert = UsageAlert(
user_id=user_id, user_id=user_id,
@@ -820,3 +786,4 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
f"Defaulting to Weekly (7 days)." f"Defaulting to Weekly (7 days)."
) )
return last_execution + timedelta(days=7) return last_execution + timedelta(days=7)

View File

@@ -3,7 +3,7 @@ OAuth Token Monitoring Task Loader
Functions to load due OAuth token monitoring tasks from database. Functions to load due OAuth token monitoring tasks from database.
""" """
from datetime import datetime, timedelta from datetime import datetime
from typing import List, Optional, Union from typing import List, Optional, Union
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import and_, or_ from sqlalchemy import and_, or_
@@ -52,34 +52,3 @@ def load_due_oauth_token_monitoring_tasks(
return query.all() return query.all()
def load_near_expiry_oauth_token_tasks(
db: Session,
refresh_horizon_hours: int = 24,
user_id: Optional[Union[str, int]] = None
) -> List[OAuthTokenMonitoringTask]:
"""
Load OAuth tasks that should run token refresh logic soon.
Includes:
- tasks with a scheduled retry now due (next_retry_at <= now)
- tasks whose routine check is inside the near-expiry horizon window
"""
now = datetime.utcnow()
horizon = now + timedelta(hours=max(refresh_horizon_hours, 1))
query = db.query(OAuthTokenMonitoringTask).filter(
and_(
OAuthTokenMonitoringTask.status.in_(['active', 'failed', 'degraded']),
or_(
OAuthTokenMonitoringTask.next_retry_at <= now,
OAuthTokenMonitoringTask.next_check <= horizon,
OAuthTokenMonitoringTask.next_check.is_(None)
)
)
)
if user_id is not None:
query = query.filter(OAuthTokenMonitoringTask.user_id == str(user_id))
return query.all()