From 52563849d56967cfa594e59605dafbef29a7a65d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Sat, 7 Mar 2026 12:15:25 +0530 Subject: [PATCH] Fix onboarding-status user ID resolution in scheduler path --- backend/services/database.py | 53 ++++++++++++++++--- .../services/onboarding/progress_service.py | 16 +++++- .../scheduler/core/check_cycle_handler.py | 30 +++++------ backend/services/scheduler/core/scheduler.py | 12 ++++- 4 files changed, 84 insertions(+), 27 deletions(-) diff --git a/backend/services/database.py b/backend/services/database.py index 7e6ee57e..9500f039 100644 --- a/backend/services/database.py +++ b/backend/services/database.py @@ -76,22 +76,59 @@ def get_user_db_path(user_id: str) -> str: def get_all_user_ids() -> List[str]: """ Discover all user IDs by scanning workspace directories. - Returns a list of user_ids (e.g., 'user_2p...', 'user_123'). + + IMPORTANT: + Workspace folder names are filesystem-safe IDs (sanitized). In some deployments, + the canonical auth user ID stored in DB can contain characters that are removed + during sanitization. To avoid downstream lookup mismatches (e.g. onboarding status + checks), we resolve the canonical `user_id` from DB when possible. + + Returns: + List of canonical user IDs when discoverable, otherwise workspace IDs. """ - user_ids = [] + user_ids: List[str] = [] if not os.path.exists(WORKSPACE_DIR): return [] - + try: + workspace_ids: List[str] = [] for item in os.listdir(WORKSPACE_DIR): if item.startswith("workspace_") and os.path.isdir(os.path.join(WORKSPACE_DIR, item)): - # Extract user_id from workspace_{user_id} - user_id = item[len("workspace_"):] - if user_id: - user_ids.append(user_id) + workspace_id = item[len("workspace_"):] + if workspace_id: + workspace_ids.append(workspace_id) + + # Resolve canonical IDs from DB rows when available. + # Falls back to workspace ID for empty/new workspaces. + from models.onboarding import OnboardingSession + + for workspace_id in workspace_ids: + canonical_user_id = workspace_id + db = None + try: + db = get_session_for_user(workspace_id) + if db: + onboarding_row = ( + db.query(OnboardingSession.user_id) + .order_by(OnboardingSession.updated_at.desc()) + .first() + ) + if onboarding_row and onboarding_row[0]: + canonical_user_id = str(onboarding_row[0]) + except Exception as resolve_error: + logger.debug( + f"Could not resolve canonical user_id from DB for workspace {workspace_id}: {resolve_error}" + ) + finally: + if db: + db.close() + + if canonical_user_id not in user_ids: + user_ids.append(canonical_user_id) + except Exception as e: logger.error(f"Error discovering user workspaces: {e}") - + return user_ids def get_engine_for_user(user_id: str): diff --git a/backend/services/onboarding/progress_service.py b/backend/services/onboarding/progress_service.py index bc2c29a5..29ff4bc2 100644 --- a/backend/services/onboarding/progress_service.py +++ b/backend/services/onboarding/progress_service.py @@ -50,7 +50,21 @@ class OnboardingProgressService: try: # Direct DB access to SSOT session session = db.query(OnboardingSession).filter(OnboardingSession.user_id == user_id).first() - + + # Fallback for sanitized/derived IDs (e.g., workspace-safe IDs) + # by comparing normalized IDs from existing onboarding rows. + if not session: + normalized_requested = ''.join(c for c in str(user_id) if c.isalnum() or c in ('-', '_')) + candidate_sessions = db.query(OnboardingSession).all() + for candidate in candidate_sessions: + candidate_user_id = str(candidate.user_id or '') + normalized_candidate = ''.join( + c for c in candidate_user_id if c.isalnum() or c in ('-', '_') + ) + if normalized_candidate == normalized_requested: + session = candidate + break + if not session: return { "is_completed": False, diff --git a/backend/services/scheduler/core/check_cycle_handler.py b/backend/services/scheduler/core/check_cycle_handler.py index 6008df94..64a58489 100644 --- a/backend/services/scheduler/core/check_cycle_handler.py +++ b/backend/services/scheduler/core/check_cycle_handler.py @@ -62,19 +62,19 @@ async def check_and_execute_due_tasks(scheduler: 'TaskScheduler'): onboarding_service = OnboardingProgressService() status = onboarding_service.get_onboarding_status(user_id) - if not status.get("is_completed", False): - # Skip logging for inactive users to reduce noise, unless debugging - # logger.debug(f"[Scheduler Check] Skipping user {user_id} - Onboarding incomplete") - continue + onboarding_completed = status.get("is_completed", False) - # Check active strategies for this user (for interval adjustment) - try: - from services.active_strategy_service import ActiveStrategyService - active_strategy_service = ActiveStrategyService(db_session=db) - user_active_strategies = active_strategy_service.count_active_strategies_with_tasks() - total_active_strategies += user_active_strategies - except Exception as e: - logger.warning(f"Error counting active strategies for user {user_id}: {e}") + # Check active strategies only after onboarding completion. + # Task execution below is not hard-gated by onboarding state so recurring + # system tasks (e.g., token monitoring) still run and surface correctly. + if onboarding_completed: + try: + from services.active_strategy_service import ActiveStrategyService + active_strategy_service = ActiveStrategyService(db_session=db) + user_active_strategies = active_strategy_service.count_active_strategies_with_tasks() + total_active_strategies += user_active_strategies + except Exception as e: + logger.warning(f"Error counting active strategies for user {user_id}: {e}") # Phase 2B: Real-time semantic health monitoring (runs every 24 hours) # Check if 24 hours have passed since last check @@ -106,11 +106,7 @@ async def check_and_execute_due_tasks(scheduler: 'TaskScheduler'): registered_types = scheduler.registry.get_registered_types() for task_type in registered_types: # Pass the user-specific session - type_summary = await scheduler._process_task_type(task_type, db, cycle_summary, user_id=user_id) - if type_summary: - cycle_summary['tasks_found_by_type'][task_type] = cycle_summary['tasks_found_by_type'].get(task_type, 0) + type_summary.get('found', 0) - cycle_summary['tasks_executed_by_type'][task_type] = cycle_summary['tasks_executed_by_type'].get(task_type, 0) + type_summary.get('executed', 0) - cycle_summary['tasks_failed_by_type'][task_type] = cycle_summary['tasks_failed_by_type'].get(task_type, 0) + type_summary.get('failed', 0) + await scheduler._process_task_type(task_type, db, cycle_summary, user_id=user_id) except Exception as e: logger.error(f"[Scheduler Check] Error processing user {user_id}: {e}") diff --git a/backend/services/scheduler/core/scheduler.py b/backend/services/scheduler/core/scheduler.py index 564449e7..a230df50 100644 --- a/backend/services/scheduler/core/scheduler.py +++ b/backend/services/scheduler/core/scheduler.py @@ -700,7 +700,15 @@ class TaskScheduler: return summary try: - tasks = task_loader(db) + # Pass user_id for strict user isolation where loaders support it. + # Keep backward compatibility with legacy loaders that only accept db. + import inspect + loader_signature = inspect.signature(task_loader) + if "user_id" in loader_signature.parameters: + tasks = task_loader(db, user_id=user_id) + else: + tasks = task_loader(db) + if not tasks: return summary @@ -749,6 +757,8 @@ class TaskScheduler: + summary["failed"] ) + self.stats["tasks_found"] += summary["found"] + return summary except Exception as e: error = TaskLoaderError(