Scheduled research persona generation
This commit is contained in:
@@ -10,6 +10,7 @@ from datetime import datetime
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from apscheduler.triggers.date import DateTrigger
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .executor_interface import TaskExecutor, TaskExecutionResult
|
||||
@@ -20,6 +21,13 @@ from .exception_handler import (
|
||||
)
|
||||
from services.database import get_db_session
|
||||
from utils.logger_utils import get_service_logger
|
||||
from ..utils.user_job_store import get_user_job_store_name
|
||||
from models.scheduler_models import SchedulerEventLog
|
||||
from .interval_manager import determine_optimal_interval, adjust_check_interval_if_needed
|
||||
from .job_restoration import restore_persona_jobs
|
||||
from .oauth_task_restoration import restore_oauth_monitoring_tasks
|
||||
from .check_cycle_handler import check_and_execute_due_tasks
|
||||
from .task_execution_handler import execute_task_async
|
||||
|
||||
logger = get_service_logger("task_scheduler")
|
||||
|
||||
@@ -34,6 +42,14 @@ class TaskScheduler:
|
||||
- Database-backed task persistence
|
||||
- Configurable check intervals
|
||||
- Automatic retry logic
|
||||
- User isolation: All tasks are filtered by user_id for isolation
|
||||
- Per-user job store context: Logs show user's website root for debugging
|
||||
|
||||
User Isolation:
|
||||
- Tasks are filtered by user_id in task loaders
|
||||
- Execution logs include user_id for tracking
|
||||
- Per-user statistics are maintained
|
||||
- Job store names (based on website root) are logged for debugging
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -63,7 +79,7 @@ class TaskScheduler:
|
||||
job_defaults={
|
||||
'coalesce': True,
|
||||
'max_instances': 1,
|
||||
'misfire_grace_time': 300 # 5 minutes grace period
|
||||
'misfire_grace_time': 3600 # 1 hour grace period for missed jobs
|
||||
}
|
||||
)
|
||||
|
||||
@@ -89,6 +105,7 @@ class TaskScheduler:
|
||||
'tasks_failed': 0,
|
||||
'tasks_skipped': 0,
|
||||
'last_check': None,
|
||||
'last_update': datetime.utcnow().isoformat(), # Timestamp for frontend polling
|
||||
'per_user_stats': {}, # Track metrics per user for user isolation
|
||||
'active_strategies_count': 0, # Track active strategies with tasks
|
||||
'last_interval_adjustment': None # Track when interval was last adjusted
|
||||
@@ -141,7 +158,11 @@ class TaskScheduler:
|
||||
|
||||
try:
|
||||
# Determine initial check interval based on active strategies
|
||||
initial_interval = await self._determine_optimal_interval()
|
||||
initial_interval = await determine_optimal_interval(
|
||||
self,
|
||||
self.min_check_interval_minutes,
|
||||
self.max_check_interval_minutes
|
||||
)
|
||||
self.current_check_interval_minutes = initial_interval
|
||||
|
||||
# Add periodic job to check for due tasks
|
||||
@@ -155,16 +176,228 @@ class TaskScheduler:
|
||||
self.scheduler.start()
|
||||
self._running = True
|
||||
|
||||
logger.info(
|
||||
f"Task scheduler started | "
|
||||
f"check_interval={initial_interval}min | "
|
||||
f"registered_types={self.registry.get_registered_types()}"
|
||||
)
|
||||
# Check for and execute any missed jobs that are still within grace period
|
||||
await self._execute_missed_jobs()
|
||||
|
||||
# Restore one-time persona generation jobs for users who completed onboarding
|
||||
await restore_persona_jobs(self)
|
||||
|
||||
# Restore/create missing OAuth token monitoring tasks for connected platforms
|
||||
await restore_oauth_monitoring_tasks(self)
|
||||
|
||||
# Get all scheduled APScheduler jobs (including one-time tasks)
|
||||
all_jobs = self.scheduler.get_jobs()
|
||||
registered_types = self.registry.get_registered_types()
|
||||
active_strategies = self.stats.get('active_strategies_count', 0)
|
||||
|
||||
# Count OAuth token monitoring tasks from database (recurring weekly tasks)
|
||||
oauth_tasks_count = 0
|
||||
oauth_tasks_details = []
|
||||
try:
|
||||
db = get_db_session()
|
||||
if db:
|
||||
from models.oauth_token_monitoring_models import OAuthTokenMonitoringTask
|
||||
# Count active tasks
|
||||
oauth_tasks_count = db.query(OAuthTokenMonitoringTask).filter(
|
||||
OAuthTokenMonitoringTask.status == 'active'
|
||||
).count()
|
||||
|
||||
# Get all tasks (for detailed logging)
|
||||
all_oauth_tasks = db.query(OAuthTokenMonitoringTask).all()
|
||||
total_oauth_tasks = len(all_oauth_tasks)
|
||||
|
||||
# Show platform breakdown for ALL tasks (active and inactive)
|
||||
all_platforms = {}
|
||||
active_platforms = {}
|
||||
for task in all_oauth_tasks:
|
||||
all_platforms[task.platform] = all_platforms.get(task.platform, 0) + 1
|
||||
if task.status == 'active':
|
||||
active_platforms[task.platform] = active_platforms.get(task.platform, 0) + 1
|
||||
|
||||
if total_oauth_tasks > 0:
|
||||
# Log details about all tasks (not just active)
|
||||
for task in all_oauth_tasks:
|
||||
oauth_tasks_details.append(
|
||||
f"user={task.user_id}, platform={task.platform}, status={task.status}"
|
||||
)
|
||||
|
||||
if total_oauth_tasks > 0 and oauth_tasks_count == 0:
|
||||
all_platform_summary = ", ".join([f"{p}: {c}" for p, c in sorted(all_platforms.items())])
|
||||
logger.warning(
|
||||
f"[Scheduler] Found {total_oauth_tasks} OAuth monitoring tasks in database, "
|
||||
f"but {oauth_tasks_count} are active. "
|
||||
f"All platforms: {all_platform_summary}. "
|
||||
f"Task details: {', '.join(oauth_tasks_details[:5])}" # Limit to first 5 for readability
|
||||
)
|
||||
elif oauth_tasks_count > 0:
|
||||
# Show platform breakdown for active tasks
|
||||
active_platform_summary = ", ".join([f"{platform}: {count}" for platform, count in sorted(active_platforms.items())])
|
||||
all_platform_summary = ", ".join([f"{p}: {c}" for p, c in sorted(all_platforms.items())])
|
||||
|
||||
# Check for missing platforms (expected: gsc, bing, wordpress, wix)
|
||||
expected_platforms = ['gsc', 'bing', 'wordpress', 'wix']
|
||||
missing_in_db = [p for p in expected_platforms if p not in all_platforms]
|
||||
|
||||
if missing_in_db:
|
||||
logger.warning(
|
||||
f"[Scheduler] Found {oauth_tasks_count} active OAuth monitoring tasks "
|
||||
f"(total: {total_oauth_tasks}). Active platforms: {active_platform_summary}. "
|
||||
f"All platforms: {all_platform_summary}. "
|
||||
f"⚠️ Missing platforms (not connected or no tasks): {', '.join(missing_in_db)}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"[Scheduler] Found {oauth_tasks_count} active OAuth monitoring tasks "
|
||||
f"(total: {total_oauth_tasks}). Active platforms: {active_platform_summary}. "
|
||||
f"All platforms: {all_platform_summary}"
|
||||
)
|
||||
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[Scheduler] Could not get OAuth token monitoring tasks count: {e}. "
|
||||
f"This may indicate the oauth_token_monitoring_tasks table doesn't exist yet or "
|
||||
f"tasks haven't been created. Error type: {type(e).__name__}"
|
||||
)
|
||||
|
||||
# Calculate job counts
|
||||
apscheduler_recurring = 1 # check_due_tasks
|
||||
apscheduler_one_time = len(all_jobs) - 1
|
||||
total_recurring = apscheduler_recurring + oauth_tasks_count
|
||||
total_jobs = len(all_jobs) + oauth_tasks_count
|
||||
|
||||
# Build comprehensive startup log message
|
||||
startup_lines = [
|
||||
f"[Scheduler] ✅ Task Scheduler Started",
|
||||
f" ├─ Check Interval: {initial_interval} minutes",
|
||||
f" ├─ Registered Task Types: {len(registered_types)} ({', '.join(registered_types) if registered_types else 'none'})",
|
||||
f" ├─ Active Strategies: {active_strategies}",
|
||||
f" ├─ Total Scheduled Jobs: {total_jobs}",
|
||||
f" ├─ Recurring Jobs: {total_recurring} (check_due_tasks: {apscheduler_recurring}, OAuth monitoring: {oauth_tasks_count})",
|
||||
f" └─ One-Time Jobs: {apscheduler_one_time}"
|
||||
]
|
||||
|
||||
# Add APScheduler job details
|
||||
if all_jobs:
|
||||
for idx, job in enumerate(all_jobs):
|
||||
is_last = idx == len(all_jobs) - 1 and oauth_tasks_count == 0
|
||||
prefix = " └─" if is_last else " ├─"
|
||||
next_run = job.next_run_time
|
||||
trigger_type = type(job.trigger).__name__
|
||||
|
||||
# Try to extract user_id from job ID or kwargs for context
|
||||
user_context = ""
|
||||
user_id_from_job = None
|
||||
|
||||
# First try to get from kwargs
|
||||
if hasattr(job, 'kwargs') and job.kwargs and job.kwargs.get('user_id'):
|
||||
user_id_from_job = job.kwargs.get('user_id')
|
||||
# Otherwise, try to extract from job ID (e.g., "research_persona_user_123..." or "research_persona_user123")
|
||||
elif job.id and ('research_persona_' in job.id or 'facebook_persona_' in job.id):
|
||||
# Job ID format: research_persona_{user_id} or facebook_persona_{user_id}
|
||||
# where user_id is Clerk format (e.g., "user_33Gz1FPI86VDXhRY8QN4ragRFGN")
|
||||
if job.id.startswith('research_persona_'):
|
||||
user_id_from_job = job.id.replace('research_persona_', '')
|
||||
elif job.id.startswith('facebook_persona_'):
|
||||
user_id_from_job = job.id.replace('facebook_persona_', '')
|
||||
else:
|
||||
# Fallback: try to extract from parts (old format with timestamp)
|
||||
parts = job.id.split('_')
|
||||
if len(parts) >= 3:
|
||||
user_id_from_job = parts[2] # Extract user_id from job ID
|
||||
|
||||
if user_id_from_job:
|
||||
try:
|
||||
db = get_db_session()
|
||||
if db:
|
||||
user_job_store = get_user_job_store_name(user_id_from_job, db)
|
||||
if user_job_store == 'default':
|
||||
logger.debug(
|
||||
f"[Scheduler] Job store extraction returned 'default' for user {user_id_from_job}. "
|
||||
f"This may indicate no onboarding data or website URL not found."
|
||||
)
|
||||
user_context = f" | User: {user_id_from_job} | Store: {user_job_store}"
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[Scheduler] Could not extract job store name for user {user_id_from_job}: {e}. "
|
||||
f"Error type: {type(e).__name__}"
|
||||
)
|
||||
user_context = f" | User: {user_id_from_job}"
|
||||
|
||||
startup_lines.append(f"{prefix} Job: {job.id} | Trigger: {trigger_type} | Next Run: {next_run}{user_context}")
|
||||
|
||||
# Add OAuth token monitoring tasks details
|
||||
# Show ALL OAuth tasks (active and inactive) for complete visibility
|
||||
if total_oauth_tasks > 0:
|
||||
try:
|
||||
db = get_db_session()
|
||||
if db:
|
||||
from models.oauth_token_monitoring_models import OAuthTokenMonitoringTask
|
||||
# Get ALL tasks, not just active ones
|
||||
oauth_tasks = db.query(OAuthTokenMonitoringTask).all()
|
||||
|
||||
for idx, task in enumerate(oauth_tasks):
|
||||
is_last = idx == len(oauth_tasks) - 1 and len(all_jobs) == 0
|
||||
prefix = " └─" if is_last else " ├─"
|
||||
|
||||
try:
|
||||
user_job_store = get_user_job_store_name(task.user_id, db)
|
||||
if user_job_store == 'default':
|
||||
logger.debug(
|
||||
f"[Scheduler] Job store extraction returned 'default' for user {task.user_id}. "
|
||||
f"This may indicate no onboarding data or website URL not found."
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[Scheduler] Could not extract job store name for user {task.user_id}: {e}. "
|
||||
f"Using 'default'. Error type: {type(e).__name__}"
|
||||
)
|
||||
user_job_store = 'default'
|
||||
|
||||
next_check = task.next_check.isoformat() if task.next_check else 'Not scheduled'
|
||||
# Include status in the log line for visibility
|
||||
status_indicator = "✅" if task.status == 'active' else f"[{task.status}]"
|
||||
startup_lines.append(
|
||||
f"{prefix} Job: oauth_token_monitoring_{task.platform}_{task.user_id} | "
|
||||
f"Trigger: CronTrigger (Weekly) | Next Run: {next_check} | "
|
||||
f"User: {task.user_id} | Store: {user_job_store} | Platform: {task.platform} {status_indicator}"
|
||||
)
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not get OAuth token monitoring task details: {e}")
|
||||
|
||||
# Log comprehensive startup information in single message
|
||||
logger.warning("\n".join(startup_lines))
|
||||
|
||||
# Save scheduler start event to database
|
||||
try:
|
||||
db = get_db_session()
|
||||
if db:
|
||||
event_log = SchedulerEventLog(
|
||||
event_type='start',
|
||||
event_date=datetime.utcnow(),
|
||||
check_interval_minutes=initial_interval,
|
||||
active_strategies_count=active_strategies,
|
||||
event_data={
|
||||
'registered_types': registered_types,
|
||||
'total_jobs': total_jobs,
|
||||
'recurring_jobs': total_recurring,
|
||||
'one_time_jobs': apscheduler_one_time,
|
||||
'oauth_monitoring_tasks': oauth_tasks_count
|
||||
}
|
||||
)
|
||||
db.add(event_log)
|
||||
db.commit()
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to save scheduler start event log: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start scheduler: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def stop(self):
|
||||
"""Stop the scheduler gracefully."""
|
||||
if not self._running:
|
||||
@@ -182,11 +415,48 @@ class TaskScheduler:
|
||||
timeout=30
|
||||
)
|
||||
|
||||
# Get final job count before shutdown
|
||||
all_jobs_before = self.scheduler.get_jobs()
|
||||
|
||||
# Shutdown scheduler
|
||||
self.scheduler.shutdown(wait=True)
|
||||
self._running = False
|
||||
|
||||
logger.info("Task scheduler stopped gracefully")
|
||||
# Log comprehensive shutdown information (use WARNING level for visibility)
|
||||
total_checks = self.stats.get('total_checks', 0)
|
||||
total_executed = self.stats.get('tasks_executed', 0)
|
||||
total_failed = self.stats.get('tasks_failed', 0)
|
||||
|
||||
shutdown_message = (
|
||||
f"[Scheduler] 🛑 Task Scheduler Stopped\n"
|
||||
f" ├─ Total Check Cycles: {total_checks}\n"
|
||||
f" ├─ Total Tasks Executed: {total_executed}\n"
|
||||
f" ├─ Total Tasks Failed: {total_failed}\n"
|
||||
f" ├─ Jobs Cancelled: {len(all_jobs_before)}\n"
|
||||
f" └─ Shutdown: Graceful"
|
||||
)
|
||||
logger.warning(shutdown_message)
|
||||
|
||||
# Save scheduler stop event to database
|
||||
try:
|
||||
db = get_db_session()
|
||||
if db:
|
||||
event_log = SchedulerEventLog(
|
||||
event_type='stop',
|
||||
event_date=datetime.utcnow(),
|
||||
check_interval_minutes=self.current_check_interval_minutes,
|
||||
event_data={
|
||||
'total_checks': total_checks,
|
||||
'total_executed': total_executed,
|
||||
'total_failed': total_failed,
|
||||
'jobs_cancelled': len(all_jobs_before)
|
||||
}
|
||||
)
|
||||
db.add(event_log)
|
||||
db.commit()
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to save scheduler stop event log: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping scheduler: {e}")
|
||||
@@ -197,109 +467,50 @@ class TaskScheduler:
|
||||
Main scheduler loop: check for due tasks and execute them.
|
||||
This runs periodically with intelligent interval adjustment based on active strategies.
|
||||
"""
|
||||
self.stats['total_checks'] += 1
|
||||
self.stats['last_check'] = datetime.utcnow().isoformat()
|
||||
|
||||
logger.debug("Checking for due tasks...")
|
||||
|
||||
db = None
|
||||
try:
|
||||
db = get_db_session()
|
||||
if db is None:
|
||||
logger.error("Failed to get database session")
|
||||
return
|
||||
|
||||
# Check for active strategies and adjust interval intelligently
|
||||
await self._adjust_check_interval_if_needed(db)
|
||||
|
||||
# Check each registered task type
|
||||
for task_type in self.registry.get_registered_types():
|
||||
await self._process_task_type(task_type, db)
|
||||
|
||||
except Exception as e:
|
||||
error = DatabaseError(
|
||||
message=f"Error checking for due tasks: {str(e)}",
|
||||
original_error=e
|
||||
)
|
||||
self.exception_handler.handle_exception(error)
|
||||
finally:
|
||||
if db:
|
||||
db.close()
|
||||
|
||||
async def _determine_optimal_interval(self) -> int:
|
||||
"""
|
||||
Determine optimal check interval based on active strategies.
|
||||
|
||||
Returns:
|
||||
Optimal check interval in minutes
|
||||
"""
|
||||
db = None
|
||||
try:
|
||||
db = get_db_session()
|
||||
if db:
|
||||
from services.active_strategy_service import ActiveStrategyService
|
||||
active_strategy_service = ActiveStrategyService(db_session=db)
|
||||
active_count = active_strategy_service.count_active_strategies_with_tasks()
|
||||
self.stats['active_strategies_count'] = active_count
|
||||
|
||||
if active_count > 0:
|
||||
logger.info(f"Found {active_count} active strategies with tasks - using {self.min_check_interval_minutes}min interval")
|
||||
return self.min_check_interval_minutes
|
||||
else:
|
||||
logger.info(f"No active strategies with tasks - using {self.max_check_interval_minutes}min interval")
|
||||
return self.max_check_interval_minutes
|
||||
except Exception as e:
|
||||
logger.warning(f"Error determining optimal interval: {e}, using default {self.min_check_interval_minutes}min")
|
||||
finally:
|
||||
if db:
|
||||
db.close()
|
||||
|
||||
# Default to shorter interval on error (safer)
|
||||
return self.min_check_interval_minutes
|
||||
await check_and_execute_due_tasks(self)
|
||||
|
||||
async def _adjust_check_interval_if_needed(self, db: Session):
|
||||
"""
|
||||
Intelligently adjust check interval based on active strategies.
|
||||
|
||||
If there are active strategies with tasks, check more frequently.
|
||||
If there are no active strategies, check less frequently.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
"""
|
||||
await adjust_check_interval_if_needed(self, db)
|
||||
|
||||
async def _execute_missed_jobs(self):
|
||||
"""
|
||||
Check for and execute any missed DateTrigger jobs that are still within grace period.
|
||||
APScheduler marks jobs as 'missed' if they were scheduled to run while the scheduler wasn't running.
|
||||
"""
|
||||
try:
|
||||
from services.active_strategy_service import ActiveStrategyService
|
||||
all_jobs = self.scheduler.get_jobs()
|
||||
now = datetime.utcnow().replace(tzinfo=self.scheduler.timezone)
|
||||
|
||||
active_strategy_service = ActiveStrategyService(db_session=db)
|
||||
active_count = active_strategy_service.count_active_strategies_with_tasks()
|
||||
self.stats['active_strategies_count'] = active_count
|
||||
missed_jobs = []
|
||||
for job in all_jobs:
|
||||
# Only check DateTrigger jobs (one-time tasks)
|
||||
if hasattr(job, 'trigger') and isinstance(job.trigger, DateTrigger):
|
||||
if job.next_run_time and job.next_run_time < now:
|
||||
# Job's scheduled time has passed
|
||||
time_since_scheduled = (now - job.next_run_time).total_seconds()
|
||||
# Check if still within grace period (1 hour = 3600 seconds)
|
||||
if time_since_scheduled <= 3600:
|
||||
missed_jobs.append(job)
|
||||
|
||||
# Determine optimal interval
|
||||
if active_count > 0:
|
||||
optimal_interval = self.min_check_interval_minutes
|
||||
else:
|
||||
optimal_interval = self.max_check_interval_minutes
|
||||
|
||||
# Only reschedule if interval needs to change
|
||||
if optimal_interval != self.current_check_interval_minutes:
|
||||
logger.info(
|
||||
f"Adjusting scheduler interval: {self.current_check_interval_minutes}min → {optimal_interval}min | "
|
||||
f"active_strategies={active_count}"
|
||||
if missed_jobs:
|
||||
logger.warning(
|
||||
f"[Scheduler] Found {len(missed_jobs)} missed job(s) within grace period, executing now..."
|
||||
)
|
||||
|
||||
# Reschedule the job with new interval
|
||||
self.scheduler.modify_job(
|
||||
'check_due_tasks',
|
||||
trigger=self._get_trigger_for_interval(optimal_interval)
|
||||
)
|
||||
|
||||
self.current_check_interval_minutes = optimal_interval
|
||||
self.stats['last_interval_adjustment'] = datetime.utcnow().isoformat()
|
||||
|
||||
logger.info(f"Scheduler interval adjusted to {optimal_interval}min")
|
||||
|
||||
for job in missed_jobs:
|
||||
try:
|
||||
# Execute the job immediately
|
||||
logger.info(f"[Scheduler] Executing missed job: {job.id}")
|
||||
await job.func(*job.args, **job.kwargs)
|
||||
except Exception as e:
|
||||
logger.error(f"[Scheduler] Error executing missed job {job.id}: {e}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error adjusting check interval: {e}")
|
||||
logger.warning(f"[Scheduler] Error checking for missed jobs: {e}")
|
||||
|
||||
async def trigger_interval_adjustment(self):
|
||||
"""
|
||||
@@ -315,14 +526,22 @@ class TaskScheduler:
|
||||
try:
|
||||
db = get_db_session()
|
||||
if db:
|
||||
await self._adjust_check_interval_if_needed(db)
|
||||
await adjust_check_interval_if_needed(self, db)
|
||||
db.close()
|
||||
else:
|
||||
logger.warning("Could not get database session for interval adjustment")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error triggering interval adjustment: {e}")
|
||||
|
||||
async def _process_task_type(self, task_type: str, db: Session):
|
||||
"""Process due tasks for a specific task type."""
|
||||
async def _process_task_type(self, task_type: str, db: Session, cycle_summary: Dict[str, Any] = None) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Process due tasks for a specific task type.
|
||||
|
||||
Returns:
|
||||
Summary dict with 'found', 'executed', 'failed' counts, or None if no tasks
|
||||
"""
|
||||
summary = {'found': 0, 'executed': 0, 'failed': 0}
|
||||
|
||||
try:
|
||||
# Get task loader for this type
|
||||
try:
|
||||
@@ -334,7 +553,7 @@ class TaskScheduler:
|
||||
original_error=e
|
||||
)
|
||||
self.exception_handler.handle_exception(error)
|
||||
return
|
||||
return None
|
||||
|
||||
# Load due tasks (with error handling)
|
||||
try:
|
||||
@@ -346,28 +565,30 @@ class TaskScheduler:
|
||||
original_error=e
|
||||
)
|
||||
self.exception_handler.handle_exception(error)
|
||||
return
|
||||
return None
|
||||
|
||||
if not due_tasks:
|
||||
return
|
||||
return None
|
||||
|
||||
summary['found'] = len(due_tasks)
|
||||
self.stats['tasks_found'] += len(due_tasks)
|
||||
logger.info(f"Found {len(due_tasks)} due tasks for type: {task_type}")
|
||||
|
||||
# Execute tasks (with concurrency limit)
|
||||
execution_tasks = []
|
||||
skipped_count = 0
|
||||
for task in due_tasks:
|
||||
if len(self.active_executions) >= self.max_concurrent_executions:
|
||||
skipped_count = len(due_tasks) - len(execution_tasks)
|
||||
logger.warning(
|
||||
f"Max concurrent executions reached ({self.max_concurrent_executions}), "
|
||||
f"skipping {len(due_tasks) - len(execution_tasks)} tasks"
|
||||
f"[Scheduler] ⚠️ Max concurrent executions reached ({self.max_concurrent_executions}), "
|
||||
f"skipping {skipped_count} tasks for {task_type}"
|
||||
)
|
||||
break
|
||||
|
||||
# Execute task asynchronously
|
||||
# Note: Each task gets its own database session to prevent concurrent access issues
|
||||
execution_task = asyncio.create_task(
|
||||
self._execute_task_async(task_type, task)
|
||||
execute_task_async(self, task_type, task, summary)
|
||||
)
|
||||
|
||||
task_id = f"{task_type}_{getattr(task, 'id', id(task))}"
|
||||
@@ -379,6 +600,8 @@ class TaskScheduler:
|
||||
if execution_tasks:
|
||||
await asyncio.wait(execution_tasks, timeout=300)
|
||||
|
||||
return summary
|
||||
|
||||
except Exception as e:
|
||||
error = TaskLoaderError(
|
||||
message=f"Error processing task type {task_type}: {str(e)}",
|
||||
@@ -386,169 +609,8 @@ class TaskScheduler:
|
||||
original_error=e
|
||||
)
|
||||
self.exception_handler.handle_exception(error)
|
||||
return summary
|
||||
|
||||
async def _execute_task_async(self, task_type: str, task: Any):
|
||||
"""
|
||||
Execute a single task asynchronously with user isolation.
|
||||
|
||||
Each task gets its own database session to prevent concurrent access issues,
|
||||
as SQLAlchemy sessions are not async-safe or concurrent-safe.
|
||||
|
||||
User context is extracted and tracked for user isolation.
|
||||
|
||||
Args:
|
||||
task_type: Type of task
|
||||
task: Task instance from database (detached from original session)
|
||||
"""
|
||||
task_id = f"{task_type}_{getattr(task, 'id', id(task))}"
|
||||
db = None
|
||||
user_id = None
|
||||
|
||||
try:
|
||||
# Extract user context if available (for user isolation tracking)
|
||||
try:
|
||||
if hasattr(task, 'strategy') and task.strategy:
|
||||
user_id = getattr(task.strategy, 'user_id', None)
|
||||
elif hasattr(task, 'strategy_id') and task.strategy_id:
|
||||
# Will query user_id after we have db session
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not extract user_id before execution for task {task_id}: {e}")
|
||||
|
||||
logger.info(f"Executing task: {task_id} | user_id: {user_id}")
|
||||
|
||||
# Create a new database session for this async task
|
||||
# SQLAlchemy sessions are not async-safe and cannot be shared across concurrent tasks
|
||||
db = get_db_session()
|
||||
if db is None:
|
||||
error = DatabaseError(
|
||||
message=f"Failed to get database session for task {task_id}",
|
||||
user_id=user_id,
|
||||
task_id=getattr(task, 'id', None),
|
||||
task_type=task_type
|
||||
)
|
||||
self.exception_handler.handle_exception(error, log_level="error")
|
||||
self.stats['tasks_failed'] += 1
|
||||
self._update_user_stats(user_id, success=False)
|
||||
return
|
||||
|
||||
# Set database session for exception handler
|
||||
self.exception_handler.db = db
|
||||
|
||||
# Merge the detached task object into this session
|
||||
# The task object was loaded in a different session and is now detached
|
||||
from sqlalchemy.orm import object_session
|
||||
if object_session(task) is None:
|
||||
# Task is detached, need to merge it into this session
|
||||
task = db.merge(task)
|
||||
|
||||
# Extract user_id after merge if not already available
|
||||
if user_id is None and hasattr(task, 'strategy'):
|
||||
try:
|
||||
if task.strategy:
|
||||
user_id = getattr(task.strategy, 'user_id', None)
|
||||
elif hasattr(task, 'strategy_id'):
|
||||
# Query strategy if relationship not loaded
|
||||
from models.enhanced_strategy_models import EnhancedContentStrategy
|
||||
strategy = db.query(EnhancedContentStrategy).filter(
|
||||
EnhancedContentStrategy.id == task.strategy_id
|
||||
).first()
|
||||
if strategy:
|
||||
user_id = strategy.user_id
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not extract user_id after merge for task {task_id}: {e}")
|
||||
|
||||
# Get executor for this task type
|
||||
try:
|
||||
executor = self.registry.get_executor(task_type)
|
||||
except Exception as e:
|
||||
from .exception_handler import SchedulerConfigError
|
||||
error = SchedulerConfigError(
|
||||
message=f"Failed to get executor for task type {task_type}: {str(e)}",
|
||||
user_id=user_id,
|
||||
context={
|
||||
"task_id": getattr(task, 'id', None),
|
||||
"task_type": task_type
|
||||
},
|
||||
original_error=e
|
||||
)
|
||||
self.exception_handler.handle_exception(error)
|
||||
self.stats['tasks_failed'] += 1
|
||||
self._update_user_stats(user_id, success=False)
|
||||
return
|
||||
|
||||
# Execute task with its own session (with error handling)
|
||||
try:
|
||||
result = await executor.execute_task(task, db)
|
||||
|
||||
# Handle result and update statistics
|
||||
if result.success:
|
||||
self.stats['tasks_executed'] += 1
|
||||
self._update_user_stats(user_id, success=True)
|
||||
logger.info(f"Task executed successfully: {task_id} | user_id: {user_id}")
|
||||
else:
|
||||
self.stats['tasks_failed'] += 1
|
||||
self._update_user_stats(user_id, success=False)
|
||||
|
||||
# Create structured error for failed execution
|
||||
error = TaskExecutionError(
|
||||
message=result.error_message or "Task execution failed",
|
||||
user_id=user_id,
|
||||
task_id=getattr(task, 'id', None),
|
||||
task_type=task_type,
|
||||
execution_time_ms=result.execution_time_ms,
|
||||
context={"result_data": result.result_data}
|
||||
)
|
||||
self.exception_handler.handle_exception(error, log_level="warning")
|
||||
|
||||
# Retry logic if enabled
|
||||
if self.enable_retries and result.retryable:
|
||||
await self._schedule_retry(task, result.retry_delay)
|
||||
|
||||
except SchedulerException as e:
|
||||
# Re-raise scheduler exceptions (they're already handled)
|
||||
raise
|
||||
except Exception as e:
|
||||
# Wrap unexpected exceptions
|
||||
error = TaskExecutionError(
|
||||
message=f"Unexpected error during task execution: {str(e)}",
|
||||
user_id=user_id,
|
||||
task_id=getattr(task, 'id', None),
|
||||
task_type=task_type,
|
||||
original_error=e
|
||||
)
|
||||
self.exception_handler.handle_exception(error)
|
||||
self.stats['tasks_failed'] += 1
|
||||
self._update_user_stats(user_id, success=False)
|
||||
|
||||
except SchedulerException as e:
|
||||
# Handle scheduler exceptions
|
||||
self.exception_handler.handle_exception(e)
|
||||
self.stats['tasks_failed'] += 1
|
||||
self._update_user_stats(user_id, success=False)
|
||||
except Exception as e:
|
||||
# Handle any other unexpected errors
|
||||
error = TaskExecutionError(
|
||||
message=f"Unexpected error in task execution wrapper: {str(e)}",
|
||||
user_id=user_id,
|
||||
task_id=getattr(task, 'id', None),
|
||||
task_type=task_type,
|
||||
original_error=e
|
||||
)
|
||||
self.exception_handler.handle_exception(error)
|
||||
self.stats['tasks_failed'] += 1
|
||||
self._update_user_stats(user_id, success=False)
|
||||
finally:
|
||||
# Clean up database session
|
||||
if db:
|
||||
try:
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing database session for task {task_id}: {e}")
|
||||
|
||||
# Remove from active executions
|
||||
if task_id in self.active_executions:
|
||||
del self.active_executions[task_id]
|
||||
|
||||
def _update_user_stats(self, user_id: Optional[int], success: bool):
|
||||
"""
|
||||
@@ -622,6 +684,117 @@ class TaskScheduler:
|
||||
|
||||
return base_stats
|
||||
|
||||
def schedule_one_time_task(
|
||||
self,
|
||||
func: Callable,
|
||||
run_date: datetime,
|
||||
job_id: str,
|
||||
args: tuple = (),
|
||||
kwargs: Dict[str, Any] = None,
|
||||
replace_existing: bool = True
|
||||
) -> str:
|
||||
"""
|
||||
Schedule a one-time task to run at a specific datetime.
|
||||
|
||||
Args:
|
||||
func: Async function to execute
|
||||
run_date: Datetime when the task should run (must be timezone-aware UTC)
|
||||
job_id: Unique identifier for this job
|
||||
args: Positional arguments to pass to func
|
||||
kwargs: Keyword arguments to pass to func
|
||||
replace_existing: If True, replace existing job with same ID
|
||||
|
||||
Returns:
|
||||
Job ID
|
||||
"""
|
||||
if not self._running:
|
||||
logger.warning(
|
||||
f"Scheduler not running, but scheduling job {job_id} anyway. "
|
||||
"APScheduler will start automatically when needed."
|
||||
)
|
||||
|
||||
try:
|
||||
# Ensure run_date is timezone-aware (UTC)
|
||||
if run_date.tzinfo is None:
|
||||
from datetime import timezone
|
||||
run_date = run_date.replace(tzinfo=timezone.utc)
|
||||
logger.debug(f"Added UTC timezone to run_date: {run_date}")
|
||||
|
||||
self.scheduler.add_job(
|
||||
func,
|
||||
trigger=DateTrigger(run_date=run_date),
|
||||
args=args,
|
||||
kwargs=kwargs or {},
|
||||
id=job_id,
|
||||
replace_existing=replace_existing,
|
||||
misfire_grace_time=3600 # 1 hour grace period for missed jobs
|
||||
)
|
||||
|
||||
# Get updated job count
|
||||
all_jobs = self.scheduler.get_jobs()
|
||||
one_time_jobs = [j for j in all_jobs if j.id != 'check_due_tasks']
|
||||
|
||||
# Extract user_id from kwargs if available for logging and job store
|
||||
user_id = kwargs.get('user_id', None) if kwargs else None
|
||||
func_name = func.__name__ if hasattr(func, '__name__') else str(func)
|
||||
|
||||
# Get job store name for user (if user_id provided)
|
||||
job_store_name = 'default'
|
||||
if user_id:
|
||||
try:
|
||||
db = get_db_session()
|
||||
if db:
|
||||
job_store_name = get_user_job_store_name(user_id, db)
|
||||
db.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not determine job store for user {user_id}: {e}")
|
||||
|
||||
# Note: APScheduler doesn't support dynamic job store creation
|
||||
# We use 'default' for all jobs but log the user's job store name for debugging
|
||||
# The actual user isolation is handled through task filtering by user_id
|
||||
|
||||
# Log detailed one-time task scheduling information (use WARNING level for visibility)
|
||||
log_message = (
|
||||
f"[Scheduler] 📅 Scheduled One-Time Task\n"
|
||||
f" ├─ Job ID: {job_id}\n"
|
||||
f" ├─ Function: {func_name}\n"
|
||||
f" ├─ User ID: {user_id or 'system'}\n"
|
||||
f" ├─ Job Store: {job_store_name} (user context)\n"
|
||||
f" ├─ Scheduled For: {run_date}\n"
|
||||
f" ├─ Replace Existing: {replace_existing}\n"
|
||||
f" ├─ Total One-Time Jobs: {len(one_time_jobs)}\n"
|
||||
f" └─ Total Scheduled Jobs: {len(all_jobs)}"
|
||||
)
|
||||
logger.warning(log_message)
|
||||
|
||||
# Log job scheduling to event log for dashboard
|
||||
try:
|
||||
event_db = get_db_session()
|
||||
if event_db:
|
||||
event_log = SchedulerEventLog(
|
||||
event_type='job_scheduled',
|
||||
event_date=datetime.utcnow(),
|
||||
job_id=job_id,
|
||||
job_type='one_time',
|
||||
user_id=user_id,
|
||||
event_data={
|
||||
'function_name': func_name,
|
||||
'job_store': job_store_name,
|
||||
'scheduled_for': run_date.isoformat(),
|
||||
'replace_existing': replace_existing
|
||||
}
|
||||
)
|
||||
event_db.add(event_log)
|
||||
event_db.commit()
|
||||
event_db.close()
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to log job scheduling event: {e}")
|
||||
|
||||
return job_id
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to schedule one-time task {job_id}: {e}")
|
||||
raise
|
||||
|
||||
def is_running(self) -> bool:
|
||||
"""Check if scheduler is running."""
|
||||
return self._running
|
||||
|
||||
Reference in New Issue
Block a user