Fix onboarding-status user ID resolution in scheduler path
This commit is contained in:
@@ -76,22 +76,59 @@ def get_user_db_path(user_id: str) -> str:
|
|||||||
def get_all_user_ids() -> List[str]:
|
def get_all_user_ids() -> List[str]:
|
||||||
"""
|
"""
|
||||||
Discover all user IDs by scanning workspace directories.
|
Discover all user IDs by scanning workspace directories.
|
||||||
Returns a list of user_ids (e.g., 'user_2p...', 'user_123').
|
|
||||||
|
IMPORTANT:
|
||||||
|
Workspace folder names are filesystem-safe IDs (sanitized). In some deployments,
|
||||||
|
the canonical auth user ID stored in DB can contain characters that are removed
|
||||||
|
during sanitization. To avoid downstream lookup mismatches (e.g. onboarding status
|
||||||
|
checks), we resolve the canonical `user_id` from DB when possible.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of canonical user IDs when discoverable, otherwise workspace IDs.
|
||||||
"""
|
"""
|
||||||
user_ids = []
|
user_ids: List[str] = []
|
||||||
if not os.path.exists(WORKSPACE_DIR):
|
if not os.path.exists(WORKSPACE_DIR):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
workspace_ids: List[str] = []
|
||||||
for item in os.listdir(WORKSPACE_DIR):
|
for item in os.listdir(WORKSPACE_DIR):
|
||||||
if item.startswith("workspace_") and os.path.isdir(os.path.join(WORKSPACE_DIR, item)):
|
if item.startswith("workspace_") and os.path.isdir(os.path.join(WORKSPACE_DIR, item)):
|
||||||
# Extract user_id from workspace_{user_id}
|
workspace_id = item[len("workspace_"):]
|
||||||
user_id = item[len("workspace_"):]
|
if workspace_id:
|
||||||
if user_id:
|
workspace_ids.append(workspace_id)
|
||||||
user_ids.append(user_id)
|
|
||||||
|
# Resolve canonical IDs from DB rows when available.
|
||||||
|
# Falls back to workspace ID for empty/new workspaces.
|
||||||
|
from models.onboarding import OnboardingSession
|
||||||
|
|
||||||
|
for workspace_id in workspace_ids:
|
||||||
|
canonical_user_id = workspace_id
|
||||||
|
db = None
|
||||||
|
try:
|
||||||
|
db = get_session_for_user(workspace_id)
|
||||||
|
if db:
|
||||||
|
onboarding_row = (
|
||||||
|
db.query(OnboardingSession.user_id)
|
||||||
|
.order_by(OnboardingSession.updated_at.desc())
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if onboarding_row and onboarding_row[0]:
|
||||||
|
canonical_user_id = str(onboarding_row[0])
|
||||||
|
except Exception as resolve_error:
|
||||||
|
logger.debug(
|
||||||
|
f"Could not resolve canonical user_id from DB for workspace {workspace_id}: {resolve_error}"
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
if db:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
if canonical_user_id not in user_ids:
|
||||||
|
user_ids.append(canonical_user_id)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error discovering user workspaces: {e}")
|
logger.error(f"Error discovering user workspaces: {e}")
|
||||||
|
|
||||||
return user_ids
|
return user_ids
|
||||||
|
|
||||||
def get_engine_for_user(user_id: str):
|
def get_engine_for_user(user_id: str):
|
||||||
|
|||||||
@@ -50,7 +50,21 @@ class OnboardingProgressService:
|
|||||||
try:
|
try:
|
||||||
# Direct DB access to SSOT session
|
# Direct DB access to SSOT session
|
||||||
session = db.query(OnboardingSession).filter(OnboardingSession.user_id == user_id).first()
|
session = db.query(OnboardingSession).filter(OnboardingSession.user_id == user_id).first()
|
||||||
|
|
||||||
|
# Fallback for sanitized/derived IDs (e.g., workspace-safe IDs)
|
||||||
|
# by comparing normalized IDs from existing onboarding rows.
|
||||||
|
if not session:
|
||||||
|
normalized_requested = ''.join(c for c in str(user_id) if c.isalnum() or c in ('-', '_'))
|
||||||
|
candidate_sessions = db.query(OnboardingSession).all()
|
||||||
|
for candidate in candidate_sessions:
|
||||||
|
candidate_user_id = str(candidate.user_id or '')
|
||||||
|
normalized_candidate = ''.join(
|
||||||
|
c for c in candidate_user_id if c.isalnum() or c in ('-', '_')
|
||||||
|
)
|
||||||
|
if normalized_candidate == normalized_requested:
|
||||||
|
session = candidate
|
||||||
|
break
|
||||||
|
|
||||||
if not session:
|
if not session:
|
||||||
return {
|
return {
|
||||||
"is_completed": False,
|
"is_completed": False,
|
||||||
|
|||||||
@@ -62,19 +62,19 @@ async def check_and_execute_due_tasks(scheduler: 'TaskScheduler'):
|
|||||||
onboarding_service = OnboardingProgressService()
|
onboarding_service = OnboardingProgressService()
|
||||||
status = onboarding_service.get_onboarding_status(user_id)
|
status = onboarding_service.get_onboarding_status(user_id)
|
||||||
|
|
||||||
if not status.get("is_completed", False):
|
onboarding_completed = status.get("is_completed", False)
|
||||||
# Skip logging for inactive users to reduce noise, unless debugging
|
|
||||||
# logger.debug(f"[Scheduler Check] Skipping user {user_id} - Onboarding incomplete")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Check active strategies for this user (for interval adjustment)
|
# Check active strategies only after onboarding completion.
|
||||||
try:
|
# Task execution below is not hard-gated by onboarding state so recurring
|
||||||
from services.active_strategy_service import ActiveStrategyService
|
# system tasks (e.g., token monitoring) still run and surface correctly.
|
||||||
active_strategy_service = ActiveStrategyService(db_session=db)
|
if onboarding_completed:
|
||||||
user_active_strategies = active_strategy_service.count_active_strategies_with_tasks()
|
try:
|
||||||
total_active_strategies += user_active_strategies
|
from services.active_strategy_service import ActiveStrategyService
|
||||||
except Exception as e:
|
active_strategy_service = ActiveStrategyService(db_session=db)
|
||||||
logger.warning(f"Error counting active strategies for user {user_id}: {e}")
|
user_active_strategies = active_strategy_service.count_active_strategies_with_tasks()
|
||||||
|
total_active_strategies += user_active_strategies
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error counting active strategies for user {user_id}: {e}")
|
||||||
|
|
||||||
# Phase 2B: Real-time semantic health monitoring (runs every 24 hours)
|
# Phase 2B: Real-time semantic health monitoring (runs every 24 hours)
|
||||||
# Check if 24 hours have passed since last check
|
# Check if 24 hours have passed since last check
|
||||||
@@ -106,11 +106,7 @@ async def check_and_execute_due_tasks(scheduler: 'TaskScheduler'):
|
|||||||
registered_types = scheduler.registry.get_registered_types()
|
registered_types = scheduler.registry.get_registered_types()
|
||||||
for task_type in registered_types:
|
for task_type in registered_types:
|
||||||
# Pass the user-specific session
|
# Pass the user-specific session
|
||||||
type_summary = await scheduler._process_task_type(task_type, db, cycle_summary, user_id=user_id)
|
await scheduler._process_task_type(task_type, db, cycle_summary, user_id=user_id)
|
||||||
if type_summary:
|
|
||||||
cycle_summary['tasks_found_by_type'][task_type] = cycle_summary['tasks_found_by_type'].get(task_type, 0) + type_summary.get('found', 0)
|
|
||||||
cycle_summary['tasks_executed_by_type'][task_type] = cycle_summary['tasks_executed_by_type'].get(task_type, 0) + type_summary.get('executed', 0)
|
|
||||||
cycle_summary['tasks_failed_by_type'][task_type] = cycle_summary['tasks_failed_by_type'].get(task_type, 0) + type_summary.get('failed', 0)
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[Scheduler Check] Error processing user {user_id}: {e}")
|
logger.error(f"[Scheduler Check] Error processing user {user_id}: {e}")
|
||||||
|
|||||||
@@ -700,7 +700,15 @@ class TaskScheduler:
|
|||||||
return summary
|
return summary
|
||||||
|
|
||||||
try:
|
try:
|
||||||
tasks = task_loader(db)
|
# Pass user_id for strict user isolation where loaders support it.
|
||||||
|
# Keep backward compatibility with legacy loaders that only accept db.
|
||||||
|
import inspect
|
||||||
|
loader_signature = inspect.signature(task_loader)
|
||||||
|
if "user_id" in loader_signature.parameters:
|
||||||
|
tasks = task_loader(db, user_id=user_id)
|
||||||
|
else:
|
||||||
|
tasks = task_loader(db)
|
||||||
|
|
||||||
if not tasks:
|
if not tasks:
|
||||||
return summary
|
return summary
|
||||||
|
|
||||||
@@ -749,6 +757,8 @@ class TaskScheduler:
|
|||||||
+ summary["failed"]
|
+ summary["failed"]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.stats["tasks_found"] += summary["found"]
|
||||||
|
|
||||||
return summary
|
return summary
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error = TaskLoaderError(
|
error = TaskLoaderError(
|
||||||
|
|||||||
Reference in New Issue
Block a user