Files
moreminimore-marketing/backend/api/scheduler_dashboard.py
Kunthawat Greethong c35fa52117 Base code
2026-01-08 22:39:53 +07:00

1329 lines
56 KiB
Python

"""
Scheduler Dashboard API
Provides endpoints for scheduler dashboard UI.
"""
from fastapi import APIRouter, HTTPException, Depends, Query
from typing import Dict, Any, Optional, List
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import desc, func
from datetime import datetime
from loguru import logger
from services.scheduler import get_scheduler
from services.scheduler.utils.user_job_store import get_user_job_store_name
from services.monitoring_data_service import MonitoringDataService
from services.database import get_db
from middleware.auth_middleware import get_current_user
from models.monitoring_models import TaskExecutionLog, MonitoringTask
from models.scheduler_models import SchedulerEventLog
from models.oauth_token_monitoring_models import OAuthTokenMonitoringTask
from models.platform_insights_monitoring_models import PlatformInsightsTask, PlatformInsightsExecutionLog
from models.website_analysis_monitoring_models import WebsiteAnalysisTask, WebsiteAnalysisExecutionLog
router = APIRouter(prefix="/api/scheduler", tags=["scheduler-dashboard"])
def _rebuild_cumulative_stats_from_events(db: Session) -> Dict[str, int]:
"""
Rebuild cumulative stats by aggregating all check_cycle events from event logs.
This is used as a fallback when the cumulative stats table doesn't exist or is invalid.
Args:
db: Database session
Returns:
Dictionary with cumulative stats
"""
try:
# Aggregate check cycle events for cumulative totals
result = db.query(
func.count(SchedulerEventLog.id),
func.sum(SchedulerEventLog.tasks_found),
func.sum(SchedulerEventLog.tasks_executed),
func.sum(SchedulerEventLog.tasks_failed)
).filter(
SchedulerEventLog.event_type == 'check_cycle'
).first()
if result:
# SQLAlchemy returns tuple for multi-column queries
# SUM returns NULL when no rows, handle that
total_cycles = result[0] if result[0] is not None else 0
total_found = result[1] if result[1] is not None else 0
total_executed = result[2] if result[2] is not None else 0
total_failed = result[3] if result[3] is not None else 0
return {
'total_check_cycles': int(total_cycles),
'cumulative_tasks_found': int(total_found),
'cumulative_tasks_executed': int(total_executed),
'cumulative_tasks_failed': int(total_failed),
'cumulative_tasks_skipped': 0 # Not tracked in event logs currently
}
else:
return {
'total_check_cycles': 0,
'cumulative_tasks_found': 0,
'cumulative_tasks_executed': 0,
'cumulative_tasks_failed': 0,
'cumulative_tasks_skipped': 0
}
except Exception as e:
logger.error(f"[Dashboard] Error rebuilding cumulative stats from events: {e}", exc_info=True)
return {
'total_check_cycles': 0,
'cumulative_tasks_found': 0,
'cumulative_tasks_executed': 0,
'cumulative_tasks_failed': 0,
'cumulative_tasks_skipped': 0
}
@router.get("/dashboard")
async def get_scheduler_dashboard(
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Get scheduler dashboard statistics and current state.
Returns:
- Scheduler stats (total checks, tasks executed, failed, etc.)
- Current scheduled jobs
- Active strategies count
- Check interval
- User isolation status
- Last check timestamp
"""
try:
scheduler = get_scheduler()
# Get user_id from current_user (Clerk format)
user_id_str = str(current_user.get('id', '')) if current_user else None
# Get scheduler stats
stats = scheduler.get_stats(user_id=None) # Get all stats for dashboard
# Get all scheduled jobs
all_jobs = scheduler.scheduler.get_jobs()
# Format jobs with user context
formatted_jobs = []
for job in all_jobs:
job_info = {
'id': job.id,
'trigger_type': type(job.trigger).__name__,
'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None,
'user_id': None,
'job_store': 'default',
'user_job_store': 'default'
}
# Extract user_id from job
user_id_from_job = None
if hasattr(job, 'kwargs') and job.kwargs and job.kwargs.get('user_id'):
user_id_from_job = job.kwargs.get('user_id')
elif job.id and ('research_persona_' in job.id or 'facebook_persona_' in job.id):
parts = job.id.split('_')
if len(parts) >= 3:
user_id_from_job = parts[2]
if user_id_from_job:
job_info['user_id'] = user_id_from_job
try:
user_job_store = get_user_job_store_name(user_id_from_job, db)
job_info['user_job_store'] = user_job_store
except Exception as e:
logger.debug(f"Could not get job store for user {user_id_from_job}: {e}")
formatted_jobs.append(job_info)
# Add OAuth token monitoring tasks from database (these are recurring weekly tasks)
try:
oauth_tasks = db.query(OAuthTokenMonitoringTask).filter(
OAuthTokenMonitoringTask.status == 'active'
).all()
oauth_tasks_count = len(oauth_tasks)
if oauth_tasks_count > 0:
# Log platform breakdown for debugging
platforms = {}
for task in oauth_tasks:
platforms[task.platform] = platforms.get(task.platform, 0) + 1
platform_summary = ", ".join([f"{platform}: {count}" for platform, count in platforms.items()])
logger.warning(
f"[Dashboard] OAuth Monitoring: Found {oauth_tasks_count} active OAuth token monitoring tasks "
f"({platform_summary})"
)
else:
# Check if there are any inactive tasks
all_oauth_tasks = db.query(OAuthTokenMonitoringTask).all()
if all_oauth_tasks:
inactive_by_status = {}
for task in all_oauth_tasks:
status = task.status
inactive_by_status[status] = inactive_by_status.get(status, 0) + 1
logger.warning(
f"[Dashboard] OAuth Monitoring: Found {len(all_oauth_tasks)} total OAuth tasks, "
f"but {oauth_tasks_count} are active. Status breakdown: {inactive_by_status}"
)
for task in oauth_tasks:
try:
user_job_store = get_user_job_store_name(task.user_id, db)
except Exception as e:
user_job_store = 'default'
logger.debug(f"Could not get job store for user {task.user_id}: {e}")
# Format as recurring weekly job
job_info = {
'id': f"oauth_token_monitoring_{task.platform}_{task.user_id}",
'trigger_type': 'CronTrigger', # Weekly recurring
'next_run_time': task.next_check.isoformat() if task.next_check else None,
'user_id': task.user_id,
'job_store': 'default',
'user_job_store': user_job_store,
'function_name': 'oauth_token_monitoring_executor.execute_task',
'platform': task.platform,
'task_id': task.id,
'is_database_task': True, # Flag to indicate this is a DB task, not APScheduler job
'frequency': 'Weekly'
}
formatted_jobs.append(job_info)
except Exception as e:
logger.error(f"Error loading OAuth token monitoring tasks: {e}", exc_info=True)
# Load website analysis tasks
try:
website_analysis_tasks = db.query(WebsiteAnalysisTask).filter(
WebsiteAnalysisTask.status == 'active'
).all()
# Filter by user if user_id_str is provided
if user_id_str:
website_analysis_tasks = [t for t in website_analysis_tasks if t.user_id == user_id_str]
for task in website_analysis_tasks:
try:
user_job_store = get_user_job_store_name(task.user_id, db)
except Exception as e:
user_job_store = 'default'
logger.debug(f"Could not get job store for user {task.user_id}: {e}")
# Format as recurring job
job_info = {
'id': f"website_analysis_{task.task_type}_{task.user_id}_{task.id}",
'trigger_type': 'CronTrigger', # Recurring based on frequency_days
'next_run_time': task.next_check.isoformat() if task.next_check else None,
'user_id': task.user_id,
'job_store': 'default',
'user_job_store': user_job_store,
'function_name': 'website_analysis_executor.execute_task',
'task_type': task.task_type, # 'user_website' or 'competitor'
'website_url': task.website_url,
'competitor_id': task.competitor_id,
'task_id': task.id,
'is_database_task': True,
'frequency': f'Every {task.frequency_days} days',
'task_category': 'website_analysis'
}
formatted_jobs.append(job_info)
except Exception as e:
logger.error(f"Error loading website analysis tasks: {e}", exc_info=True)
# Load platform insights tasks (GSC and Bing)
try:
insights_tasks = db.query(PlatformInsightsTask).filter(
PlatformInsightsTask.status == 'active'
).all()
# Filter by user if user_id_str is provided
if user_id_str:
insights_tasks = [t for t in insights_tasks if t.user_id == user_id_str]
for task in insights_tasks:
try:
user_job_store = get_user_job_store_name(task.user_id, db)
except Exception as e:
user_job_store = 'default'
logger.debug(f"Could not get job store for user {task.user_id}: {e}")
# Format as recurring weekly job
job_info = {
'id': f"platform_insights_{task.platform}_{task.user_id}",
'trigger_type': 'CronTrigger', # Weekly recurring
'next_run_time': task.next_check.isoformat() if task.next_check else None,
'user_id': task.user_id,
'job_store': 'default',
'user_job_store': user_job_store,
'function_name': f'{task.platform}_insights_executor.execute_task',
'platform': task.platform,
'task_id': task.id,
'is_database_task': True,
'frequency': 'Weekly',
'task_category': 'platform_insights'
}
formatted_jobs.append(job_info)
except Exception as e:
logger.error(f"Error loading platform insights tasks: {e}", exc_info=True)
# Get active strategies count
active_strategies = stats.get('active_strategies_count', 0)
# Get last_update from stats (added by scheduler for frontend polling)
last_update = stats.get('last_update')
# Calculate cumulative/historical values from persistent cumulative stats table
# Fallback to event logs aggregation if cumulative stats table doesn't exist or is invalid
cumulative_stats = {}
try:
from models.scheduler_cumulative_stats_model import SchedulerCumulativeStats
# Try to get cumulative stats from dedicated table (persistent across restarts)
cumulative_stats_row = db.query(SchedulerCumulativeStats).filter(
SchedulerCumulativeStats.id == 1
).first()
if cumulative_stats_row:
# Use persistent cumulative stats
cumulative_stats = {
'total_check_cycles': int(cumulative_stats_row.total_check_cycles or 0),
'cumulative_tasks_found': int(cumulative_stats_row.cumulative_tasks_found or 0),
'cumulative_tasks_executed': int(cumulative_stats_row.cumulative_tasks_executed or 0),
'cumulative_tasks_failed': int(cumulative_stats_row.cumulative_tasks_failed or 0),
'cumulative_tasks_skipped': int(cumulative_stats_row.cumulative_tasks_skipped or 0),
'cumulative_job_completed': int(cumulative_stats_row.cumulative_job_completed or 0),
'cumulative_job_failed': int(cumulative_stats_row.cumulative_job_failed or 0)
}
logger.debug(
f"[Dashboard] Using persistent cumulative stats: "
f"cycles={cumulative_stats['total_check_cycles']}, "
f"found={cumulative_stats['cumulative_tasks_found']}, "
f"executed={cumulative_stats['cumulative_tasks_executed']}, "
f"failed={cumulative_stats['cumulative_tasks_failed']}"
)
# Validate cumulative stats by comparing with event logs (for verification)
check_cycle_count = db.query(func.count(SchedulerEventLog.id)).filter(
SchedulerEventLog.event_type == 'check_cycle'
).scalar() or 0
if cumulative_stats['total_check_cycles'] != check_cycle_count:
logger.warning(
f"[Dashboard] ⚠️ Cumulative stats validation mismatch: "
f"cumulative_stats.total_check_cycles={cumulative_stats['total_check_cycles']} "
f"vs event_logs.count={check_cycle_count}. "
f"Rebuilding cumulative stats from event logs..."
)
# Rebuild cumulative stats from event logs
cumulative_stats = _rebuild_cumulative_stats_from_events(db)
# Update the persistent table
if cumulative_stats_row:
cumulative_stats_row.total_check_cycles = cumulative_stats['total_check_cycles']
cumulative_stats_row.cumulative_tasks_found = cumulative_stats['cumulative_tasks_found']
cumulative_stats_row.cumulative_tasks_executed = cumulative_stats['cumulative_tasks_executed']
cumulative_stats_row.cumulative_tasks_failed = cumulative_stats['cumulative_tasks_failed']
cumulative_stats_row.cumulative_tasks_skipped = cumulative_stats.get('cumulative_tasks_skipped', 0)
db.commit()
logger.warning(f"[Dashboard] ✅ Rebuilt cumulative stats: {cumulative_stats}")
else:
# Cumulative stats table doesn't exist or is empty, rebuild from event logs
logger.warning(
"[Dashboard] Cumulative stats table not found or empty. "
"Rebuilding from event logs..."
)
cumulative_stats = _rebuild_cumulative_stats_from_events(db)
# Create/update the persistent table
cumulative_stats_row = SchedulerCumulativeStats.get_or_create(db)
cumulative_stats_row.total_check_cycles = cumulative_stats['total_check_cycles']
cumulative_stats_row.cumulative_tasks_found = cumulative_stats['cumulative_tasks_found']
cumulative_stats_row.cumulative_tasks_executed = cumulative_stats['cumulative_tasks_executed']
cumulative_stats_row.cumulative_tasks_failed = cumulative_stats['cumulative_tasks_failed']
cumulative_stats_row.cumulative_tasks_skipped = cumulative_stats.get('cumulative_tasks_skipped', 0)
db.commit()
logger.warning(f"[Dashboard] ✅ Created/updated cumulative stats: {cumulative_stats}")
except ImportError:
# Cumulative stats model doesn't exist yet (migration not run)
logger.warning(
"[Dashboard] Cumulative stats model not found. "
"Falling back to event logs aggregation. "
"Run migration: create_scheduler_cumulative_stats.sql"
)
cumulative_stats = _rebuild_cumulative_stats_from_events(db)
except Exception as e:
logger.error(f"[Dashboard] Error getting cumulative stats: {e}", exc_info=True)
# Fallback to event logs aggregation
cumulative_stats = _rebuild_cumulative_stats_from_events(db)
return {
'stats': {
# Current session stats (from scheduler memory)
'total_checks': stats.get('total_checks', 0),
'tasks_found': stats.get('tasks_found', 0),
'tasks_executed': stats.get('tasks_executed', 0),
'tasks_failed': stats.get('tasks_failed', 0),
'tasks_skipped': stats.get('tasks_skipped', 0),
'last_check': stats.get('last_check'),
'last_update': last_update, # Include for frontend polling
'active_executions': stats.get('active_executions', 0),
'running': stats.get('running', False),
'check_interval_minutes': stats.get('check_interval_minutes', 60),
'min_check_interval_minutes': stats.get('min_check_interval_minutes', 15),
'max_check_interval_minutes': stats.get('max_check_interval_minutes', 60),
'intelligent_scheduling': stats.get('intelligent_scheduling', True),
'active_strategies_count': active_strategies,
'last_interval_adjustment': stats.get('last_interval_adjustment'),
'registered_types': stats.get('registered_types', []),
# Cumulative/historical stats (from database)
'cumulative_total_check_cycles': cumulative_stats.get('total_check_cycles', 0),
'cumulative_tasks_found': cumulative_stats.get('cumulative_tasks_found', 0),
'cumulative_tasks_executed': cumulative_stats.get('cumulative_tasks_executed', 0),
'cumulative_tasks_failed': cumulative_stats.get('cumulative_tasks_failed', 0)
},
'jobs': formatted_jobs,
'job_count': len(formatted_jobs),
'recurring_jobs': 1 + len([j for j in formatted_jobs if j.get('is_database_task')]), # check_due_tasks + all DB tasks
'one_time_jobs': len([j for j in formatted_jobs if not j.get('is_database_task') and j.get('trigger_type') == 'DateTrigger']),
'registered_task_types': stats.get('registered_types', []), # Include registered task types
'user_isolation': {
'enabled': True,
'current_user_id': user_id_str
},
'last_updated': datetime.utcnow().isoformat() # Keep for backward compatibility
}
except Exception as e:
logger.error(f"Error getting scheduler dashboard: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get scheduler dashboard: {str(e)}")
@router.get("/execution-logs")
async def get_execution_logs(
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
status: Optional[str] = Query(None, regex="^(success|failed|running|skipped)$"),
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Get task execution logs from database.
Query Params:
- limit: Number of logs to return (1-500, default: 50)
- offset: Pagination offset (default: 0)
- status: Filter by status (success, failed, running, skipped)
Returns:
- List of execution logs with task details
- Total count for pagination
"""
try:
# Get user_id from current_user (Clerk format - convert to int if needed)
user_id_str = str(current_user.get('id', '')) if current_user else None
# Check if user_id column exists in the database
from sqlalchemy import inspect
inspector = inspect(db.bind)
columns = [col['name'] for col in inspector.get_columns('task_execution_logs')]
has_user_id_column = 'user_id' in columns
# If user_id column doesn't exist, we need to handle the query differently
# to avoid SQLAlchemy trying to access a non-existent column
if not has_user_id_column:
# Query without user_id column - use explicit column selection
from sqlalchemy import func
# Build query for count
count_query = db.query(func.count(TaskExecutionLog.id)).join(
MonitoringTask,
TaskExecutionLog.task_id == MonitoringTask.id
)
# Filter by status if provided
if status:
count_query = count_query.filter(TaskExecutionLog.status == status)
total_count = count_query.scalar() or 0
# Build query for data - select specific columns to avoid user_id
query = db.query(
TaskExecutionLog.id,
TaskExecutionLog.task_id,
TaskExecutionLog.execution_date,
TaskExecutionLog.status,
TaskExecutionLog.result_data,
TaskExecutionLog.error_message,
TaskExecutionLog.execution_time_ms,
TaskExecutionLog.created_at,
MonitoringTask
).join(
MonitoringTask,
TaskExecutionLog.task_id == MonitoringTask.id
)
# Filter by status if provided
if status:
query = query.filter(TaskExecutionLog.status == status)
# Get paginated results
logs = query.order_by(TaskExecutionLog.execution_date.desc()).offset(offset).limit(limit).all()
# Format results for compatibility
formatted_logs = []
for log_tuple in logs:
# Unpack the tuple
log_id, task_id, execution_date, log_status, result_data, error_message, execution_time_ms, created_at, task = log_tuple
log_data = {
'id': log_id,
'task_id': task_id,
'user_id': None, # No user_id column in database
'execution_date': execution_date.isoformat() if execution_date else None,
'status': log_status,
'error_message': error_message,
'execution_time_ms': execution_time_ms,
'result_data': result_data,
'created_at': created_at.isoformat() if created_at else None
}
# Add task details
if task:
log_data['task'] = {
'id': task.id,
'task_title': task.task_title,
'component_name': task.component_name,
'metric': task.metric,
'frequency': task.frequency
}
formatted_logs.append(log_data)
return {
'logs': formatted_logs,
'total_count': total_count,
'limit': limit,
'offset': offset,
'has_more': (offset + limit) < total_count,
'is_scheduler_logs': False # Explicitly mark as execution logs, not scheduler logs
}
# If user_id column exists, use the normal query path
# Build query with eager loading of task relationship
query = db.query(TaskExecutionLog).join(
MonitoringTask,
TaskExecutionLog.task_id == MonitoringTask.id
).options(
joinedload(TaskExecutionLog.task)
)
# Filter by status if provided
if status:
query = query.filter(TaskExecutionLog.status == status)
# Filter by user_id if provided (for user isolation)
if user_id_str and has_user_id_column:
# Note: user_id in TaskExecutionLog is Integer, but we have Clerk string
# For now, get all logs - can enhance later with user_id mapping
pass
# Get total count
total_count = query.count()
# Get paginated results
logs = query.order_by(desc(TaskExecutionLog.execution_date)).offset(offset).limit(limit).all()
# Format results
formatted_logs = []
for log in logs:
log_data = {
'id': log.id,
'task_id': log.task_id,
'user_id': log.user_id if has_user_id_column else None,
'execution_date': log.execution_date.isoformat() if log.execution_date else None,
'status': log.status,
'error_message': log.error_message,
'execution_time_ms': log.execution_time_ms,
'result_data': log.result_data,
'created_at': log.created_at.isoformat() if log.created_at else None
}
# Add task details if available
if log.task:
log_data['task'] = {
'id': log.task.id,
'task_title': log.task.task_title,
'component_name': log.task.component_name,
'metric': log.task.metric,
'frequency': log.task.frequency
}
formatted_logs.append(log_data)
return {
'logs': formatted_logs,
'total_count': total_count,
'limit': limit,
'offset': offset,
'has_more': (offset + limit) < total_count,
'is_scheduler_logs': False # Explicitly mark as execution logs, not scheduler logs
}
except Exception as e:
logger.error(f"Error getting execution logs: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get execution logs: {str(e)}")
@router.get("/jobs")
async def get_scheduler_jobs(
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Get detailed information about all scheduled jobs.
Returns:
- List of jobs with detailed information
- Job ID, trigger type, next run time
- User context (extracted from job ID/kwargs)
- Job store name (from user's website root)
"""
try:
scheduler = get_scheduler()
all_jobs = scheduler.scheduler.get_jobs()
formatted_jobs = []
for job in all_jobs:
job_info = {
'id': job.id,
'trigger_type': type(job.trigger).__name__,
'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None,
'jobstore': getattr(job, 'jobstore', 'default'),
'user_id': None,
'user_job_store': 'default',
'function_name': None
}
# Extract user_id from job
user_id_from_job = None
if hasattr(job, 'kwargs') and job.kwargs and job.kwargs.get('user_id'):
user_id_from_job = job.kwargs.get('user_id')
elif job.id and ('research_persona_' in job.id or 'facebook_persona_' in job.id):
parts = job.id.split('_')
if len(parts) >= 3:
user_id_from_job = parts[2]
if user_id_from_job:
job_info['user_id'] = user_id_from_job
try:
user_job_store = get_user_job_store_name(user_id_from_job, db)
job_info['user_job_store'] = user_job_store
except Exception as e:
logger.debug(f"Could not get job store for user {user_id_from_job}: {e}")
# Get function name if available
if hasattr(job, 'func') and hasattr(job.func, '__name__'):
job_info['function_name'] = job.func.__name__
elif hasattr(job, 'func_ref'):
job_info['function_name'] = str(job.func_ref)
formatted_jobs.append(job_info)
return {
'jobs': formatted_jobs,
'total_jobs': len(formatted_jobs),
'recurring_jobs': 1, # check_due_tasks
'one_time_jobs': len(formatted_jobs) - 1
}
except Exception as e:
logger.error(f"Error getting scheduler jobs: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get scheduler jobs: {str(e)}")
@router.get("/event-history")
async def get_scheduler_event_history(
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
event_type: Optional[str] = Query(None, regex="^(check_cycle|interval_adjustment|start|stop|job_scheduled|job_cancelled|job_completed|job_failed)$"),
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Get scheduler event history from database.
This endpoint returns historical scheduler events such as:
- Check cycles (when scheduler runs and checks for due tasks)
- Interval adjustments (when check interval changes)
- Scheduler start/stop events
- Job scheduled/cancelled events
Query Params:
- limit: Number of events to return (1-1000, default: 100)
- offset: Pagination offset (default: 0)
- event_type: Filter by event type (check_cycle, interval_adjustment, start, stop, etc.)
Returns:
- List of scheduler events with details
- Total count for pagination
"""
try:
# Build query
query = db.query(SchedulerEventLog)
# Filter by event type if provided
if event_type:
query = query.filter(SchedulerEventLog.event_type == event_type)
# Get total count
total_count = query.count()
# Get paginated results (most recent first)
events = query.order_by(desc(SchedulerEventLog.event_date)).offset(offset).limit(limit).all()
# Format results
formatted_events = []
for event in events:
event_data = {
'id': event.id,
'event_type': event.event_type,
'event_date': event.event_date.isoformat() if event.event_date else None,
'check_cycle_number': event.check_cycle_number,
'check_interval_minutes': event.check_interval_minutes,
'previous_interval_minutes': event.previous_interval_minutes,
'new_interval_minutes': event.new_interval_minutes,
'tasks_found': event.tasks_found,
'tasks_executed': event.tasks_executed,
'tasks_failed': event.tasks_failed,
'tasks_by_type': event.tasks_by_type,
'check_duration_seconds': event.check_duration_seconds,
'active_strategies_count': event.active_strategies_count,
'active_executions': event.active_executions,
'job_id': event.job_id,
'job_type': event.job_type,
'user_id': event.user_id,
'event_data': event.event_data,
'error_message': event.error_message,
'created_at': event.created_at.isoformat() if event.created_at else None
}
formatted_events.append(event_data)
return {
'events': formatted_events,
'total_count': total_count,
'limit': limit,
'offset': offset,
'has_more': (offset + limit) < total_count
}
except Exception as e:
logger.error(f"Error getting scheduler event history: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get scheduler event history: {str(e)}")
@router.get("/recent-scheduler-logs")
async def get_recent_scheduler_logs(
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Get recent scheduler logs (restoration, job scheduling, etc.) for display in Execution Logs.
These are informational logs that show scheduler activity when actual execution logs are not available.
Returns only the latest 5 logs (rolling window, not accumulating).
Returns:
- List of latest 5 scheduler events (job_scheduled, job_completed, job_failed)
- Formatted as execution log-like entries for display
"""
try:
# Get only the latest 5 scheduler events - simple rolling window
# Focus on job-related events that indicate scheduler activity
query = db.query(SchedulerEventLog).filter(
SchedulerEventLog.event_type.in_(['job_scheduled', 'job_completed', 'job_failed'])
).order_by(desc(SchedulerEventLog.event_date)).limit(5)
events = query.all()
# Log for debugging - show more details
logger.warning(
f"[Dashboard] Recent scheduler logs query: found {len(events)} events"
)
if events:
for e in events:
logger.warning(
f"[Dashboard] - Event: {e.event_type} | "
f"Job ID: {e.job_id} | User: {e.user_id} | "
f"Date: {e.event_date} | Error: {bool(e.error_message)}"
)
else:
# Check if there are ANY events of these types
total_count = db.query(func.count(SchedulerEventLog.id)).filter(
SchedulerEventLog.event_type.in_(['job_scheduled', 'job_completed', 'job_failed'])
).scalar() or 0
logger.warning(
f"[Dashboard] No recent scheduler logs found (query returned 0). "
f"Total events of these types in DB: {total_count}"
)
# Format as execution log-like entries
formatted_logs = []
for event in events:
event_data = event.event_data or {}
# Determine status based on event type
status = 'running'
if event.event_type == 'job_completed':
status = 'success'
elif event.event_type == 'job_failed':
status = 'failed'
# Extract job function name
job_function = event_data.get('job_function') or event_data.get('function_name') or 'unknown'
# Extract execution time if available
execution_time_ms = None
if event_data.get('execution_time_seconds'):
execution_time_ms = int(event_data.get('execution_time_seconds', 0) * 1000)
log_entry = {
'id': f"scheduler_event_{event.id}",
'task_id': None,
'user_id': event.user_id,
'execution_date': event.event_date.isoformat() if event.event_date else None,
'status': status,
'error_message': event.error_message,
'execution_time_ms': execution_time_ms,
'result_data': None,
'created_at': event.created_at.isoformat() if event.created_at else None,
'task': {
'id': None,
'task_title': f"{event.event_type.replace('_', ' ').title()}: {event.job_id or 'N/A'}",
'component_name': 'Scheduler',
'metric': job_function,
'frequency': 'one-time'
},
'is_scheduler_log': True, # Flag to indicate this is a scheduler log, not execution log
'event_type': event.event_type,
'job_id': event.job_id
}
formatted_logs.append(log_entry)
# Log the formatted response for debugging
logger.warning(
f"[Dashboard] Formatted {len(formatted_logs)} scheduler logs for response. "
f"Sample log entry keys: {list(formatted_logs[0].keys()) if formatted_logs else 'none'}"
)
return {
'logs': formatted_logs,
'total_count': len(formatted_logs),
'limit': 5,
'offset': 0,
'has_more': False,
'is_scheduler_logs': True # Indicate these are scheduler logs, not execution logs
}
except Exception as e:
logger.error(f"Error getting recent scheduler logs: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get recent scheduler logs: {str(e)}")
@router.get("/platform-insights/status/{user_id}")
async def get_platform_insights_status(
user_id: str,
db: Session = Depends(get_db),
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Get platform insights task status for a user.
Returns:
- GSC insights tasks
- Bing insights tasks
- Task details and execution logs
"""
try:
# Verify user can only access their own data
if str(current_user.get('id')) != user_id:
raise HTTPException(status_code=403, detail="Access denied")
logger.debug(f"[Platform Insights Status] Getting status for user: {user_id}")
# Get all insights tasks for user
tasks = db.query(PlatformInsightsTask).filter(
PlatformInsightsTask.user_id == user_id
).order_by(PlatformInsightsTask.platform, PlatformInsightsTask.created_at).all()
# Check if user has connected platforms but missing insights tasks
# Auto-create missing tasks for connected platforms
from services.oauth_token_monitoring_service import get_connected_platforms
from services.platform_insights_monitoring_service import create_platform_insights_task
connected_platforms = get_connected_platforms(user_id)
insights_platforms = ['gsc', 'bing']
connected_insights = [p for p in connected_platforms if p in insights_platforms]
existing_platforms = {task.platform for task in tasks}
missing_platforms = [p for p in connected_insights if p not in existing_platforms]
if missing_platforms:
logger.info(
f"[Platform Insights Status] User {user_id} has connected platforms {missing_platforms} "
f"but missing insights tasks. Creating tasks..."
)
for platform in missing_platforms:
try:
# Don't fetch site_url here - it requires API calls
# The executor will fetch it when the task runs
# Create task without site_url to avoid API calls during status checks
result = create_platform_insights_task(
user_id=user_id,
platform=platform,
site_url=None, # Will be fetched by executor when task runs
db=db
)
if result.get('success'):
logger.info(f"[Platform Insights Status] Created {platform.upper()} insights task for user {user_id}")
else:
logger.warning(f"[Platform Insights Status] Failed to create {platform} task: {result.get('error')}")
except Exception as e:
logger.warning(f"[Platform Insights Status] Error creating {platform} task: {e}", exc_info=True)
# Re-query tasks after creation
tasks = db.query(PlatformInsightsTask).filter(
PlatformInsightsTask.user_id == user_id
).order_by(PlatformInsightsTask.platform, PlatformInsightsTask.created_at).all()
# Group tasks by platform
gsc_tasks = [t for t in tasks if t.platform == 'gsc']
bing_tasks = [t for t in tasks if t.platform == 'bing']
logger.debug(
f"[Platform Insights Status] Found {len(tasks)} total tasks: "
f"{len(gsc_tasks)} GSC, {len(bing_tasks)} Bing"
)
# Format tasks
def format_task(task: PlatformInsightsTask) -> Dict[str, Any]:
return {
'id': task.id,
'platform': task.platform,
'site_url': task.site_url,
'status': task.status,
'last_check': task.last_check.isoformat() if task.last_check else None,
'last_success': task.last_success.isoformat() if task.last_success else None,
'last_failure': task.last_failure.isoformat() if task.last_failure else None,
'failure_reason': task.failure_reason,
'next_check': task.next_check.isoformat() if task.next_check else None,
'created_at': task.created_at.isoformat() if task.created_at else None,
'updated_at': task.updated_at.isoformat() if task.updated_at else None
}
return {
'success': True,
'user_id': user_id,
'gsc_tasks': [format_task(t) for t in gsc_tasks],
'bing_tasks': [format_task(t) for t in bing_tasks],
'total_tasks': len(tasks)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting platform insights status for user {user_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to get platform insights status: {str(e)}")
@router.get("/website-analysis/status/{user_id}")
async def get_website_analysis_status(
user_id: str,
db: Session = Depends(get_db),
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Get website analysis task status for a user.
Returns:
- User website tasks
- Competitor website tasks
- Task details and execution logs
"""
try:
# Verify user can only access their own data
if str(current_user.get('id')) != user_id:
raise HTTPException(status_code=403, detail="Access denied")
logger.debug(f"[Website Analysis Status] Getting status for user: {user_id}")
# Get all website analysis tasks for user
tasks = db.query(WebsiteAnalysisTask).filter(
WebsiteAnalysisTask.user_id == user_id
).order_by(WebsiteAnalysisTask.task_type, WebsiteAnalysisTask.created_at).all()
# Separate user website and competitor tasks
user_website_tasks = [t for t in tasks if t.task_type == 'user_website']
competitor_tasks = [t for t in tasks if t.task_type == 'competitor']
logger.debug(
f"[Website Analysis Status] Found {len(tasks)} tasks for user {user_id}: "
f"{len(user_website_tasks)} user website, {len(competitor_tasks)} competitors"
)
# Format tasks
def format_task(task: WebsiteAnalysisTask) -> Dict[str, Any]:
return {
'id': task.id,
'website_url': task.website_url,
'task_type': task.task_type,
'competitor_id': task.competitor_id,
'status': task.status,
'last_check': task.last_check.isoformat() if task.last_check else None,
'last_success': task.last_success.isoformat() if task.last_success else None,
'last_failure': task.last_failure.isoformat() if task.last_failure else None,
'failure_reason': task.failure_reason,
'next_check': task.next_check.isoformat() if task.next_check else None,
'frequency_days': task.frequency_days,
'created_at': task.created_at.isoformat() if task.created_at else None,
'updated_at': task.updated_at.isoformat() if task.updated_at else None
}
active_tasks = len([t for t in tasks if t.status == 'active'])
failed_tasks = len([t for t in tasks if t.status == 'failed'])
return {
'success': True,
'data': {
'user_id': user_id,
'user_website_tasks': [format_task(t) for t in user_website_tasks],
'competitor_tasks': [format_task(t) for t in competitor_tasks],
'total_tasks': len(tasks),
'active_tasks': active_tasks,
'failed_tasks': failed_tasks
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting website analysis status for user {user_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to get website analysis status: {str(e)}")
@router.get("/website-analysis/logs/{user_id}")
async def get_website_analysis_logs(
user_id: str,
task_id: Optional[int] = Query(None),
limit: int = Query(10, ge=1, le=100),
offset: int = Query(0, ge=0),
db: Session = Depends(get_db),
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Get execution logs for website analysis tasks.
Args:
user_id: User ID
task_id: Optional task ID to filter logs
limit: Maximum number of logs to return
offset: Pagination offset
Returns:
List of execution logs
"""
try:
# Verify user can only access their own data
if str(current_user.get('id')) != user_id:
raise HTTPException(status_code=403, detail="Access denied")
query = db.query(WebsiteAnalysisExecutionLog).join(
WebsiteAnalysisTask,
WebsiteAnalysisExecutionLog.task_id == WebsiteAnalysisTask.id
).filter(
WebsiteAnalysisTask.user_id == user_id
)
if task_id:
query = query.filter(WebsiteAnalysisExecutionLog.task_id == task_id)
# Get total count
total_count = query.count()
logs = query.order_by(
desc(WebsiteAnalysisExecutionLog.execution_date)
).offset(offset).limit(limit).all()
# Format logs
formatted_logs = []
for log in logs:
# Get task details
task = db.query(WebsiteAnalysisTask).filter(WebsiteAnalysisTask.id == log.task_id).first()
formatted_logs.append({
'id': log.id,
'task_id': log.task_id,
'website_url': task.website_url if task else None,
'task_type': task.task_type if task else None,
'execution_date': log.execution_date.isoformat() if log.execution_date else None,
'status': log.status,
'result_data': log.result_data,
'error_message': log.error_message,
'execution_time_ms': log.execution_time_ms,
'created_at': log.created_at.isoformat() if log.created_at else None
})
return {
'logs': formatted_logs,
'total_count': total_count,
'limit': limit,
'offset': offset,
'has_more': (offset + limit) < total_count
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting website analysis logs for user {user_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to get website analysis logs: {str(e)}")
@router.post("/website-analysis/retry/{task_id}")
async def retry_website_analysis(
task_id: int,
db: Session = Depends(get_db),
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Manually retry a failed website analysis task.
Args:
task_id: Task ID to retry
Returns:
Success status and updated task details
"""
try:
# Get task
task = db.query(WebsiteAnalysisTask).filter(WebsiteAnalysisTask.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
# Verify user can only access their own tasks
if str(current_user.get('id')) != task.user_id:
raise HTTPException(status_code=403, detail="Access denied")
# Reset task status and schedule immediate execution
task.status = 'active'
task.failure_reason = None
task.next_check = datetime.utcnow() # Schedule immediately
task.updated_at = datetime.utcnow()
db.commit()
logger.info(f"Manually retried website analysis task {task_id} for user {task.user_id}")
return {
'success': True,
'message': f'Website analysis task {task_id} scheduled for immediate execution',
'task': {
'id': task.id,
'website_url': task.website_url,
'status': task.status,
'next_check': task.next_check.isoformat() if task.next_check else None
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error retrying website analysis task {task_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to retry website analysis: {str(e)}")
@router.get("/tasks-needing-intervention/{user_id}")
async def get_tasks_needing_intervention(
user_id: str,
db: Session = Depends(get_db),
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Get all tasks that need human intervention.
Args:
user_id: User ID
Returns:
List of tasks needing intervention with failure pattern details
"""
try:
# Verify user access
if str(current_user.get('id')) != user_id:
raise HTTPException(status_code=403, detail="Access denied")
from services.scheduler.core.failure_detection_service import FailureDetectionService
detection_service = FailureDetectionService(db)
tasks = detection_service.get_tasks_needing_intervention(user_id=user_id)
return {
"success": True,
"tasks": tasks,
"count": len(tasks)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting tasks needing intervention: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to get tasks needing intervention: {str(e)}")
@router.post("/tasks/{task_type}/{task_id}/manual-trigger")
async def manual_trigger_task(
task_type: str,
task_id: int,
db: Session = Depends(get_db),
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Manually trigger a task that is in cool-off or needs intervention.
This bypasses the cool-off check and executes the task immediately.
Args:
task_type: Task type (oauth_token_monitoring, website_analysis, gsc_insights, bing_insights)
task_id: Task ID
Returns:
Success status and execution result
"""
try:
from services.scheduler.core.task_execution_handler import execute_task_async
scheduler = get_scheduler()
# Load task based on type
task = None
if task_type == "oauth_token_monitoring":
task = db.query(OAuthTokenMonitoringTask).filter(
OAuthTokenMonitoringTask.id == task_id
).first()
elif task_type == "website_analysis":
task = db.query(WebsiteAnalysisTask).filter(
WebsiteAnalysisTask.id == task_id
).first()
elif task_type in ["gsc_insights", "bing_insights"]:
task = db.query(PlatformInsightsTask).filter(
PlatformInsightsTask.id == task_id
).first()
else:
raise HTTPException(status_code=400, detail=f"Unknown task type: {task_type}")
if not task:
raise HTTPException(status_code=404, detail="Task not found")
# Verify user access
if str(current_user.get('id')) != task.user_id:
raise HTTPException(status_code=403, detail="Access denied")
# Clear cool-off status and reset failure count
task.status = "active"
task.consecutive_failures = 0
task.failure_pattern = None
# Execute task manually (bypasses cool-off check)
# Task types are registered as: oauth_token_monitoring, website_analysis, gsc_insights, bing_insights
await execute_task_async(scheduler, task_type, task, execution_source="manual")
db.commit()
logger.info(f"Manually triggered task {task_id} ({task_type}) for user {task.user_id}")
return {
"success": True,
"message": "Task triggered successfully",
"task": {
"id": task.id,
"status": task.status,
"last_check": task.last_check.isoformat() if task.last_check else None
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error manually triggering task {task_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to trigger task: {str(e)}")
@router.get("/platform-insights/logs/{user_id}")
async def get_platform_insights_logs(
user_id: str,
task_id: Optional[int] = Query(None),
limit: int = Query(10, ge=1, le=100),
db: Session = Depends(get_db),
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""
Get execution logs for platform insights tasks.
Args:
user_id: User ID
task_id: Optional task ID to filter logs
limit: Maximum number of logs to return
Returns:
List of execution logs
"""
try:
# Verify user can only access their own data
if str(current_user.get('id')) != user_id:
raise HTTPException(status_code=403, detail="Access denied")
query = db.query(PlatformInsightsExecutionLog).join(
PlatformInsightsTask,
PlatformInsightsExecutionLog.task_id == PlatformInsightsTask.id
).filter(
PlatformInsightsTask.user_id == user_id
)
if task_id:
query = query.filter(PlatformInsightsExecutionLog.task_id == task_id)
logs = query.order_by(
desc(PlatformInsightsExecutionLog.execution_date)
).limit(limit).all()
def format_log(log: PlatformInsightsExecutionLog) -> Dict[str, Any]:
return {
'id': log.id,
'task_id': log.task_id,
'execution_date': log.execution_date.isoformat() if log.execution_date else None,
'status': log.status,
'result_data': log.result_data,
'error_message': log.error_message,
'execution_time_ms': log.execution_time_ms,
'data_source': log.data_source,
'created_at': log.created_at.isoformat() if log.created_at else None
}
return {
'success': True,
'logs': [format_log(log) for log in logs],
'total_count': len(logs)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting platform insights logs for user {user_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to get platform insights logs: {str(e)}")