diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 1e8de027..b6efea6d 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -51,30 +51,34 @@ class TaskStatusUpdateRequest(BaseModel): ) async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str): - svc = TxtaiIntelligenceService(user_id) - items = [] - for t in tasks: - task_id = t.get("id") - pillar_id = t.get("pillarId") - status = t.get("status") - title = t.get("title") - description = t.get("description") - text = f"[{pillar_id}] {title}\n{description}\nstatus={status}" - metadata = { - "type": "daily_workflow_task", - "date": date, - "label": label, - "pillar_id": pillar_id, - "status": status, - "implemented": status == "completed", - "dismissed": status == "skipped", - "task_id": task_id, - } - items.append((f"{label}_task:{user_id}:{date}:{task_id}", text, metadata)) + """Index tasks to SIF in background without blocking the main API response.""" try: + svc = TxtaiIntelligenceService(user_id) + items = [] + for t in tasks: + task_id = t.get("id") + pillar_id = t.get("pillarId") + status = t.get("status") + title = t.get("title") + description = t.get("description") + text = f"[{pillar_id}] {title}\n{description}\nstatus={status}" + metadata = { + "type": "daily_workflow_task", + "date": date, + "label": label, + "pillar_id": pillar_id, + "status": status, + "implemented": status == "completed", + "dismissed": status == "skipped", + "task_id": task_id, + } + items.append((f"{label}_task:{user_id}:{date}:{task_id}", text, metadata)) + + # Index content without blocking - service will initialize in background if needed await svc.index_content(items) - except Exception: - return + except Exception as e: + # Log but don't raise - indexing failures shouldn't crash the API + logger.debug(f"Background indexing failed for user {user_id}: {e}") @router.get("") diff --git a/backend/services/intelligence/txtai_service.py b/backend/services/intelligence/txtai_service.py index 59caec9d..b641e598 100644 --- a/backend/services/intelligence/txtai_service.py +++ b/backend/services/intelligence/txtai_service.py @@ -7,6 +7,8 @@ Enhanced with intelligent caching for performance optimization. import os import traceback +import asyncio +import threading from typing import List, Dict, Any, Optional, Tuple from loguru import logger from datetime import datetime @@ -26,10 +28,15 @@ except ImportError: class TxtaiIntelligenceService: _instances = {} + _init_locks = {} # Locks for thread-safe initialization + _init_tasks = {} # Track ongoing initialization tasks def __new__(cls, user_id: str, *args, **kwargs): if user_id not in cls._instances: cls._instances[user_id] = super(TxtaiIntelligenceService, cls).__new__(cls) + # Create a lock for this user's initialization + if user_id not in cls._init_locks: + cls._init_locks[user_id] = asyncio.Lock() return cls._instances[user_id] def __init__(self, user_id: str, model_path: Optional[str] = None, enable_caching: bool = True): @@ -42,6 +49,7 @@ class TxtaiIntelligenceService: self.index_path = f"workspace/workspace_{user_id}/indices/txtai" self.embeddings = None self._initialized = False + self._initialization_in_progress = False self.enable_caching = enable_caching self.cache_manager = semantic_cache_manager if enable_caching else None self._backend = "faiss" # Default backend @@ -54,9 +62,40 @@ class TxtaiIntelligenceService: # self._initialize_embeddings() def _ensure_initialized(self): - """Lazy initialization helper.""" - if not self._initialized: - self._initialize_embeddings() + """Lazy initialization helper - non-blocking version for API calls.""" + if self._initialized: + # Already initialized, no-op + return + + if self._initialization_in_progress: + # Initialization already triggered, skip to avoid blocking + logger.debug(f"Initialization already in progress for user {self.user_id}, skipping redundant call") + return + + # Mark as in progress and initialize in background thread + self._initialization_in_progress = True + thread = threading.Thread(target=self._initialize_embeddings, daemon=True) + thread.start() + logger.debug(f"Background initialization started for user {self.user_id}") + + async def _ensure_initialized_async(self): + """Async initialization helper - waits for initialization to complete.""" + if self._initialized: + return + + # Ensure we have a lock for this user + if self.user_id not in self.__class__._init_locks: + self.__class__._init_locks[self.user_id] = asyncio.Lock() + + # Use a lock to prevent concurrent initialization attempts + async with self.__class__._init_locks[self.user_id]: + # Double-check after acquiring lock + if self._initialized: + return + + # Run initialization in thread pool to avoid blocking event loop + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._initialize_embeddings) def _initialize_embeddings(self, load_existing_index: bool = True): """Initialize txtai embeddings with local storage support and comprehensive error handling.""" @@ -182,14 +221,26 @@ class TxtaiIntelligenceService: async def index_content(self, items: List[Tuple[str, str, Dict[str, Any]]]): """ - Index content for semantic search and clustering. + Index content for semantic search and clustering (non-blocking). Args: items: List of (id, text, metadata) tuples. """ - self._ensure_initialized() - if not self._initialized or not self.embeddings: - logger.error(f"Cannot index content - service not initialized for user {self.user_id}") + # Check if already initialized + if not self._initialized and not self._initialization_in_progress: + # Trigger initialization in background (non-blocking) + self._ensure_initialized() + # Don't wait for initialization - let it happen in background + logger.debug(f"Indexing triggered for user {self.user_id}, initialization will happen in background") + return + + # If initialization is still in progress, log and return without blocking + if not self._initialized: + logger.warning(f"Service not yet initialized for user {self.user_id}, indexing will retry later") + return + + if not self.embeddings: + logger.error(f"Cannot index content - embeddings not available for user {self.user_id}") return try: