Compare commits

..

1 Commits

Author SHA1 Message Date
ي
6fdf318d79 Add OAuth token refresh retries, status persistence, and alert payloads 2026-05-18 15:56:57 +05:30
5 changed files with 86 additions and 282 deletions

View File

@@ -40,6 +40,10 @@ class OAuthTokenMonitoringTask(Base):
# Scheduling # Scheduling
next_check = Column(DateTime, nullable=True, index=True) # Next scheduled check time next_check = Column(DateTime, nullable=True, index=True) # Next scheduled check time
next_retry_at = Column(DateTime, nullable=True, index=True) # Backoff retry schedule for refresh failures
refresh_attempts = Column(Integer, default=0) # Current retry attempt count for refresh workflow
terminal_failure_reason = Column(Text, nullable=True) # Permanent failure reason requiring user action
channel_status = Column(String(32), default='connected') # connected, degraded, disconnected
# Metadata # Metadata
created_at = Column(DateTime, default=datetime.utcnow) created_at = Column(DateTime, default=datetime.utcnow)
@@ -97,4 +101,3 @@ class OAuthTokenExecutionLog(Base):
def __repr__(self): def __repr__(self):
return f"<OAuthTokenExecutionLog(id={self.id}, task_id={self.task_id}, status={self.status}, execution_date={self.execution_date})>" return f"<OAuthTokenExecutionLog(id={self.id}, task_id={self.task_id}, status={self.status}, execution_date={self.execution_date})>"

View File

@@ -26,7 +26,10 @@ from .executors.advertools_executor import AdvertoolsExecutor
from .executors.sif_indexing_executor import SIFIndexingExecutor from .executors.sif_indexing_executor import SIFIndexingExecutor
from .executors.market_trends_executor import MarketTrendsExecutor from .executors.market_trends_executor import MarketTrendsExecutor
from .utils.task_loader import load_due_monitoring_tasks from .utils.task_loader import load_due_monitoring_tasks
from .utils.oauth_token_task_loader import load_due_oauth_token_monitoring_tasks from .utils.oauth_token_task_loader import (
load_due_oauth_token_monitoring_tasks,
load_near_expiry_oauth_token_tasks
)
from .utils.website_analysis_task_loader import load_due_website_analysis_tasks from .utils.website_analysis_task_loader import load_due_website_analysis_tasks
from .utils.onboarding_full_website_analysis_task_loader import load_due_onboarding_full_website_analysis_tasks from .utils.onboarding_full_website_analysis_task_loader import load_due_onboarding_full_website_analysis_tasks
from .utils.deep_competitor_analysis_task_loader import load_due_deep_competitor_analysis_tasks from .utils.deep_competitor_analysis_task_loader import load_due_deep_competitor_analysis_tasks
@@ -70,6 +73,11 @@ def get_scheduler() -> TaskScheduler:
oauth_token_executor, oauth_token_executor,
load_due_oauth_token_monitoring_tasks load_due_oauth_token_monitoring_tasks
) )
_scheduler_instance.register_executor(
'oauth_token_refresh',
oauth_token_executor,
load_near_expiry_oauth_token_tasks
)
# Register website analysis executor # Register website analysis executor
website_analysis_executor = WebsiteAnalysisExecutor() website_analysis_executor = WebsiteAnalysisExecutor()

View File

@@ -42,6 +42,8 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
self.exception_handler = SchedulerExceptionHandler() self.exception_handler = SchedulerExceptionHandler()
# Expiration warning window (7 days before expiration) # Expiration warning window (7 days before expiration)
self.expiration_warning_days = 7 self.expiration_warning_days = 7
self.max_refresh_retries = 3
self.base_retry_backoff_minutes = 15
async def execute_task(self, task: OAuthTokenMonitoringTask, db: Session) -> TaskExecutionResult: async def execute_task(self, task: OAuthTokenMonitoringTask, db: Session) -> TaskExecutionResult:
""" """
@@ -93,6 +95,10 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
task.last_success = datetime.utcnow() task.last_success = datetime.utcnow()
task.status = 'active' task.status = 'active'
task.failure_reason = None task.failure_reason = None
task.terminal_failure_reason = None
task.channel_status = 'connected'
task.refresh_attempts = 0
task.next_retry_at = None
# Reset failure tracking on success # Reset failure tracking on success
task.consecutive_failures = 0 task.consecutive_failures = 0
task.failure_pattern = None task.failure_pattern = None
@@ -112,6 +118,7 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
task.last_failure = datetime.utcnow() task.last_failure = datetime.utcnow()
task.failure_reason = result.error_message task.failure_reason = result.error_message
task.refresh_attempts = (task.refresh_attempts or 0) + 1
if pattern and pattern.should_cool_off: if pattern and pattern.should_cool_off:
# Mark task for human intervention # Mark task for human intervention
@@ -126,6 +133,9 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
} }
# Clear next_check - task won't run automatically # Clear next_check - task won't run automatically
task.next_check = None task.next_check = None
task.next_retry_at = None
task.channel_status = "disconnected"
task.terminal_failure_reason = result.error_message
self.logger.warning( self.logger.warning(
f"Task {task.id} marked for human intervention: " f"Task {task.id} marked for human intervention: "
@@ -133,10 +143,17 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
f"reason: {pattern.failure_reason.value}" f"reason: {pattern.failure_reason.value}"
) )
else: else:
# Normal failure handling
task.status = 'failed'
task.consecutive_failures = (task.consecutive_failures or 0) + 1 task.consecutive_failures = (task.consecutive_failures or 0) + 1
# Do NOT update next_check - wait for manual trigger if task.refresh_attempts >= self.max_refresh_retries:
task.status = 'failed'
task.channel_status = 'disconnected'
task.terminal_failure_reason = result.error_message
task.next_retry_at = None
else:
task.status = 'degraded'
task.channel_status = 'degraded'
delay_minutes = self.base_retry_backoff_minutes * (2 ** (task.refresh_attempts - 1))
task.next_retry_at = datetime.utcnow() + timedelta(minutes=delay_minutes)
self.logger.warning( self.logger.warning(
f"OAuth token refresh failed for user {user_id}, platform {platform}. " f"OAuth token refresh failed for user {user_id}, platform {platform}. "
@@ -144,7 +161,7 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
) )
# Create UsageAlert notification for the user # Create UsageAlert notification for the user
self._create_failure_alert(user_id, platform, result.error_message, result.result_data, db) self._create_failure_alert(user_id, platform, result.error_message, result.result_data, db, task)
task.updated_at = datetime.utcnow() task.updated_at = datetime.utcnow()
db.commit() db.commit()
@@ -193,12 +210,14 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
task.last_failure = datetime.utcnow() task.last_failure = datetime.utcnow()
task.failure_reason = str(e) task.failure_reason = str(e)
task.status = 'failed' task.status = 'failed'
task.channel_status = 'disconnected'
task.terminal_failure_reason = str(e)
task.last_check = datetime.utcnow() task.last_check = datetime.utcnow()
task.updated_at = datetime.utcnow() task.updated_at = datetime.utcnow()
# Do NOT update next_check - wait for manual trigger task.next_retry_at = None
# Create UsageAlert notification for the user # Create UsageAlert notification for the user
self._create_failure_alert(user_id, task.platform, str(e), None, db) self._create_failure_alert(user_id, task.platform, str(e), None, db, task)
db.commit() db.commit()
except Exception as commit_error: except Exception as commit_error:
@@ -651,7 +670,8 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
platform: str, platform: str,
error_message: str, error_message: str,
result_data: Optional[Dict[str, Any]], result_data: Optional[Dict[str, Any]],
db: Session db: Session,
task: Optional[OAuthTokenMonitoringTask] = None
): ):
""" """
Create a UsageAlert notification when OAuth token refresh fails. Create a UsageAlert notification when OAuth token refresh fails.
@@ -724,6 +744,20 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
from datetime import datetime from datetime import datetime
billing_period = datetime.utcnow().strftime("%Y-%m") billing_period = datetime.utcnow().strftime("%Y-%m")
alert_payload = {
"requires_user_action": True,
"platform": platform,
"channel_status": getattr(task, "channel_status", "disconnected"),
"terminal_failure_reason": getattr(task, "terminal_failure_reason", error_message),
"next_retry_at": (
task.next_retry_at.isoformat() if task and task.next_retry_at else None
),
"refresh_attempts": getattr(task, "refresh_attempts", 0),
"max_refresh_retries": self.max_refresh_retries,
}
message = f"{message} [ALERT_PAYLOAD] {alert_payload}"
# Create UsageAlert # Create UsageAlert
alert = UsageAlert( alert = UsageAlert(
user_id=user_id, user_id=user_id,
@@ -786,4 +820,3 @@ class OAuthTokenMonitoringExecutor(TaskExecutor):
f"Defaulting to Weekly (7 days)." f"Defaulting to Weekly (7 days)."
) )
return last_execution + timedelta(days=7) return last_execution + timedelta(days=7)

View File

@@ -1,271 +0,0 @@
"""Self-healing executor for social post engagement recovery.
Implements:
- Per-post evaluation windows and cooldown timers
- Stagnation trigger evaluation with tiered action selection
- Action idempotency keys for edit/comment/thread operations
- Duplicate and over-frequency suppression within cooldown boundaries
- Outcome persistence and safe retry policy for transient failures
"""
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta, timezone
from enum import Enum
import hashlib
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
class ActionType(str, Enum):
EDIT = "edit"
COMMENT = "comment"
THREAD = "thread"
class ActionTier(str, Enum):
TIER_1 = "tier_1" # low-intensity nudge (comment)
TIER_2 = "tier_2" # medium-intensity enhancement (edit)
TIER_3 = "tier_3" # high-intensity amplification (thread)
SAFE_TRANSIENT_ERROR_CODES = {
"timeout",
"rate_limit",
"service_unavailable",
"network_error",
}
@dataclass
class EvaluationConfig:
per_post_window_minutes: int = 90
min_samples_required: int = 3
cooldown_by_action_seconds: Dict[ActionType, int] = field(
default_factory=lambda: {
ActionType.COMMENT: 30 * 60,
ActionType.EDIT: 2 * 60 * 60,
ActionType.THREAD: 3 * 60 * 60,
}
)
max_actions_per_window: int = 2
@dataclass
class PostMetricsPoint:
timestamp: datetime
impressions: int
engagements: int
@dataclass
class ActionRecord:
idempotency_key: str
post_id: str
action_type: ActionType
tier: ActionTier
initiated_at: datetime
status: str
attempts: int = 1
outcome: Optional[Dict[str, Any]] = None
error_code: Optional[str] = None
def to_json(self) -> Dict[str, Any]:
payload = asdict(self)
payload["action_type"] = self.action_type.value
payload["tier"] = self.tier.value
payload["initiated_at"] = self.initiated_at.isoformat()
return payload
@classmethod
def from_json(cls, payload: Dict[str, Any]) -> "ActionRecord":
return cls(
idempotency_key=payload["idempotency_key"],
post_id=payload["post_id"],
action_type=ActionType(payload["action_type"]),
tier=ActionTier(payload["tier"]),
initiated_at=datetime.fromisoformat(payload["initiated_at"]),
status=payload["status"],
attempts=payload.get("attempts", 1),
outcome=payload.get("outcome"),
error_code=payload.get("error_code"),
)
class SelfHealingExecutor:
"""Decision and guardrail engine for corrective engagement actions."""
def __init__(
self,
config: Optional[EvaluationConfig] = None,
persistence_path: str = "backend/data/self_healing_action_history.json",
) -> None:
self.config = config or EvaluationConfig()
self.persistence_path = Path(persistence_path)
self._history: List[ActionRecord] = self._load_history()
def evaluate_and_plan(
self,
post_id: str,
metrics: List[PostMetricsPoint],
now: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Evaluate stagnation for a post and plan a single best next action."""
now = now or datetime.now(timezone.utc)
window_metrics = self._filter_window(metrics, now)
if len(window_metrics) < self.config.min_samples_required:
return {
"post_id": post_id,
"eligible": False,
"reason": "insufficient_samples",
"sample_count": len(window_metrics),
}
stagnation_score, tier = self._evaluate_stagnation(window_metrics)
action_type = self._choose_action_type(tier)
idempotency_key = self.generate_idempotency_key(post_id, action_type, tier)
if self._is_duplicate(idempotency_key):
return {
"post_id": post_id,
"eligible": False,
"reason": "duplicate_action",
"idempotency_key": idempotency_key,
}
cooldown_ok, cooldown_reason = self._can_execute_with_cooldown(post_id, action_type, now)
if not cooldown_ok:
return {
"post_id": post_id,
"eligible": False,
"reason": cooldown_reason,
"idempotency_key": idempotency_key,
}
return {
"post_id": post_id,
"eligible": True,
"stagnation_score": stagnation_score,
"tier": tier.value,
"action_type": action_type.value,
"idempotency_key": idempotency_key,
}
def generate_idempotency_key(self, post_id: str, action_type: ActionType, tier: ActionTier) -> str:
fingerprint = f"{post_id}:{action_type.value}:{tier.value}".encode("utf-8")
digest = hashlib.sha256(fingerprint).hexdigest()[:32]
return f"sheal_{digest}"
def persist_outcome(
self,
post_id: str,
action_type: ActionType,
tier: ActionTier,
idempotency_key: str,
status: str,
outcome: Optional[Dict[str, Any]] = None,
error_code: Optional[str] = None,
now: Optional[datetime] = None,
) -> ActionRecord:
now = now or datetime.now(timezone.utc)
existing = next((h for h in self._history if h.idempotency_key == idempotency_key), None)
if existing:
existing.status = status
existing.outcome = outcome
existing.error_code = error_code
existing.attempts += 1
existing.initiated_at = now
record = existing
else:
record = ActionRecord(
idempotency_key=idempotency_key,
post_id=post_id,
action_type=action_type,
tier=tier,
initiated_at=now,
status=status,
outcome=outcome,
error_code=error_code,
)
self._history.append(record)
self._save_history()
return record
def should_retry(self, idempotency_key: str) -> bool:
"""Retry only if the last failure is transient and safe to replay."""
rec = next((h for h in self._history if h.idempotency_key == idempotency_key), None)
if not rec or rec.status != "failed":
return False
if rec.error_code not in SAFE_TRANSIENT_ERROR_CODES:
return False
return rec.action_type in {ActionType.COMMENT, ActionType.EDIT, ActionType.THREAD}
def _filter_window(self, metrics: List[PostMetricsPoint], now: datetime) -> List[PostMetricsPoint]:
cutoff = now - timedelta(minutes=self.config.per_post_window_minutes)
return [m for m in metrics if m.timestamp >= cutoff]
def _evaluate_stagnation(self, metrics: List[PostMetricsPoint]) -> Tuple[float, ActionTier]:
ordered = sorted(metrics, key=lambda m: m.timestamp)
first, last = ordered[0], ordered[-1]
imp_delta = max(0, last.impressions - first.impressions)
eng_delta = max(0, last.engagements - first.engagements)
eng_rate = eng_delta / imp_delta if imp_delta > 0 else 0.0
stagnation_score = 1.0 - min(1.0, eng_rate * 20)
if stagnation_score >= 0.8:
return stagnation_score, ActionTier.TIER_3
if stagnation_score >= 0.55:
return stagnation_score, ActionTier.TIER_2
return stagnation_score, ActionTier.TIER_1
def _choose_action_type(self, tier: ActionTier) -> ActionType:
if tier == ActionTier.TIER_1:
return ActionType.COMMENT
if tier == ActionTier.TIER_2:
return ActionType.EDIT
return ActionType.THREAD
def _is_duplicate(self, idempotency_key: str) -> bool:
return any(h.idempotency_key == idempotency_key and h.status in {"success", "running"} for h in self._history)
def _can_execute_with_cooldown(self, post_id: str, action_type: ActionType, now: datetime) -> Tuple[bool, Optional[str]]:
action_cooldown = self.config.cooldown_by_action_seconds[action_type]
same_post = [h for h in self._history if h.post_id == post_id]
recent_in_window = [
h for h in same_post
if h.initiated_at >= now - timedelta(minutes=self.config.per_post_window_minutes)
]
if len(recent_in_window) >= self.config.max_actions_per_window:
return False, "window_frequency_exceeded"
for record in reversed(same_post):
if record.action_type != action_type:
continue
if (now - record.initiated_at).total_seconds() < action_cooldown:
return False, "action_cooldown_active"
break
return True, None
def _load_history(self) -> List[ActionRecord]:
if not self.persistence_path.exists():
return []
try:
payload = json.loads(self.persistence_path.read_text(encoding="utf-8"))
return [ActionRecord.from_json(item) for item in payload]
except (json.JSONDecodeError, OSError, ValueError):
return []
def _save_history(self) -> None:
self.persistence_path.parent.mkdir(parents=True, exist_ok=True)
payload = [item.to_json() for item in self._history]
self.persistence_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")

View File

@@ -3,7 +3,7 @@ OAuth Token Monitoring Task Loader
Functions to load due OAuth token monitoring tasks from database. Functions to load due OAuth token monitoring tasks from database.
""" """
from datetime import datetime from datetime import datetime, timedelta
from typing import List, Optional, Union from typing import List, Optional, Union
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import and_, or_ from sqlalchemy import and_, or_
@@ -52,3 +52,34 @@ def load_due_oauth_token_monitoring_tasks(
return query.all() return query.all()
def load_near_expiry_oauth_token_tasks(
db: Session,
refresh_horizon_hours: int = 24,
user_id: Optional[Union[str, int]] = None
) -> List[OAuthTokenMonitoringTask]:
"""
Load OAuth tasks that should run token refresh logic soon.
Includes:
- tasks with a scheduled retry now due (next_retry_at <= now)
- tasks whose routine check is inside the near-expiry horizon window
"""
now = datetime.utcnow()
horizon = now + timedelta(hours=max(refresh_horizon_hours, 1))
query = db.query(OAuthTokenMonitoringTask).filter(
and_(
OAuthTokenMonitoringTask.status.in_(['active', 'failed', 'degraded']),
or_(
OAuthTokenMonitoringTask.next_retry_at <= now,
OAuthTokenMonitoringTask.next_check <= horizon,
OAuthTokenMonitoringTask.next_check.is_(None)
)
)
)
if user_id is not None:
query = query.filter(OAuthTokenMonitoringTask.user_id == str(user_id))
return query.all()