Merge branch 'pr-398'

This commit is contained in:
ajaysi
2026-03-09 13:50:12 +05:30
4 changed files with 84 additions and 27 deletions

View File

@@ -76,22 +76,59 @@ def get_user_db_path(user_id: str) -> str:
def get_all_user_ids() -> List[str]:
"""
Discover all user IDs by scanning workspace directories.
Returns a list of user_ids (e.g., 'user_2p...', 'user_123').
IMPORTANT:
Workspace folder names are filesystem-safe IDs (sanitized). In some deployments,
the canonical auth user ID stored in DB can contain characters that are removed
during sanitization. To avoid downstream lookup mismatches (e.g. onboarding status
checks), we resolve the canonical `user_id` from DB when possible.
Returns:
List of canonical user IDs when discoverable, otherwise workspace IDs.
"""
user_ids = []
user_ids: List[str] = []
if not os.path.exists(WORKSPACE_DIR):
return []
try:
workspace_ids: List[str] = []
for item in os.listdir(WORKSPACE_DIR):
if item.startswith("workspace_") and os.path.isdir(os.path.join(WORKSPACE_DIR, item)):
# Extract user_id from workspace_{user_id}
user_id = item[len("workspace_"):]
if user_id:
user_ids.append(user_id)
workspace_id = item[len("workspace_"):]
if workspace_id:
workspace_ids.append(workspace_id)
# Resolve canonical IDs from DB rows when available.
# Falls back to workspace ID for empty/new workspaces.
from models.onboarding import OnboardingSession
for workspace_id in workspace_ids:
canonical_user_id = workspace_id
db = None
try:
db = get_session_for_user(workspace_id)
if db:
onboarding_row = (
db.query(OnboardingSession.user_id)
.order_by(OnboardingSession.updated_at.desc())
.first()
)
if onboarding_row and onboarding_row[0]:
canonical_user_id = str(onboarding_row[0])
except Exception as resolve_error:
logger.debug(
f"Could not resolve canonical user_id from DB for workspace {workspace_id}: {resolve_error}"
)
finally:
if db:
db.close()
if canonical_user_id not in user_ids:
user_ids.append(canonical_user_id)
except Exception as e:
logger.error(f"Error discovering user workspaces: {e}")
return user_ids
def get_engine_for_user(user_id: str):

View File

@@ -50,7 +50,21 @@ class OnboardingProgressService:
try:
# Direct DB access to SSOT session
session = db.query(OnboardingSession).filter(OnboardingSession.user_id == user_id).first()
# Fallback for sanitized/derived IDs (e.g., workspace-safe IDs)
# by comparing normalized IDs from existing onboarding rows.
if not session:
normalized_requested = ''.join(c for c in str(user_id) if c.isalnum() or c in ('-', '_'))
candidate_sessions = db.query(OnboardingSession).all()
for candidate in candidate_sessions:
candidate_user_id = str(candidate.user_id or '')
normalized_candidate = ''.join(
c for c in candidate_user_id if c.isalnum() or c in ('-', '_')
)
if normalized_candidate == normalized_requested:
session = candidate
break
if not session:
return {
"is_completed": False,

View File

@@ -62,19 +62,19 @@ async def check_and_execute_due_tasks(scheduler: 'TaskScheduler'):
onboarding_service = OnboardingProgressService()
status = onboarding_service.get_onboarding_status(user_id)
if not status.get("is_completed", False):
# Skip logging for inactive users to reduce noise, unless debugging
# logger.debug(f"[Scheduler Check] Skipping user {user_id} - Onboarding incomplete")
continue
onboarding_completed = status.get("is_completed", False)
# Check active strategies for this user (for interval adjustment)
try:
from services.active_strategy_service import ActiveStrategyService
active_strategy_service = ActiveStrategyService(db_session=db)
user_active_strategies = active_strategy_service.count_active_strategies_with_tasks()
total_active_strategies += user_active_strategies
except Exception as e:
logger.warning(f"Error counting active strategies for user {user_id}: {e}")
# Check active strategies only after onboarding completion.
# Task execution below is not hard-gated by onboarding state so recurring
# system tasks (e.g., token monitoring) still run and surface correctly.
if onboarding_completed:
try:
from services.active_strategy_service import ActiveStrategyService
active_strategy_service = ActiveStrategyService(db_session=db)
user_active_strategies = active_strategy_service.count_active_strategies_with_tasks()
total_active_strategies += user_active_strategies
except Exception as e:
logger.warning(f"Error counting active strategies for user {user_id}: {e}")
# Phase 2B: Real-time semantic health monitoring (runs every 24 hours)
# Check if 24 hours have passed since last check
@@ -106,11 +106,7 @@ async def check_and_execute_due_tasks(scheduler: 'TaskScheduler'):
registered_types = scheduler.registry.get_registered_types()
for task_type in registered_types:
# Pass the user-specific session
type_summary = await scheduler._process_task_type(task_type, db, cycle_summary, user_id=user_id)
if type_summary:
cycle_summary['tasks_found_by_type'][task_type] = cycle_summary['tasks_found_by_type'].get(task_type, 0) + type_summary.get('found', 0)
cycle_summary['tasks_executed_by_type'][task_type] = cycle_summary['tasks_executed_by_type'].get(task_type, 0) + type_summary.get('executed', 0)
cycle_summary['tasks_failed_by_type'][task_type] = cycle_summary['tasks_failed_by_type'].get(task_type, 0) + type_summary.get('failed', 0)
await scheduler._process_task_type(task_type, db, cycle_summary, user_id=user_id)
except Exception as e:
logger.error(f"[Scheduler Check] Error processing user {user_id}: {e}")

View File

@@ -700,7 +700,15 @@ class TaskScheduler:
return summary
try:
tasks = task_loader(db)
# Pass user_id for strict user isolation where loaders support it.
# Keep backward compatibility with legacy loaders that only accept db.
import inspect
loader_signature = inspect.signature(task_loader)
if "user_id" in loader_signature.parameters:
tasks = task_loader(db, user_id=user_id)
else:
tasks = task_loader(db)
if not tasks:
return summary
@@ -749,6 +757,8 @@ class TaskScheduler:
+ summary["failed"]
)
self.stats["tasks_found"] += summary["found"]
return summary
except Exception as e:
error = TaskLoaderError(