fix: Make TxtaiIntelligenceService initialization non-blocking
- Modified _ensure_initialized() to run in background thread (non-blocking) - Added _ensure_initialized_async() for truly async initialization - Updated index_content() to return immediately without waiting for initialization - Weights now load in background thread instead of blocking event loop - Added initialization tracking to prevent duplicate initialization - Modified today_workflow API to handle non-blocking indexing gracefully - This prevents dashboard refresh from blocking other services When a user accesses the dashboard, the indexing now happens in background instead of blocking the HTTP response, allowing other services to function normally while weights are being loaded.
This commit is contained in:
@@ -51,30 +51,34 @@ class TaskStatusUpdateRequest(BaseModel):
|
||||
)
|
||||
|
||||
async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str):
|
||||
svc = TxtaiIntelligenceService(user_id)
|
||||
items = []
|
||||
for t in tasks:
|
||||
task_id = t.get("id")
|
||||
pillar_id = t.get("pillarId")
|
||||
status = t.get("status")
|
||||
title = t.get("title")
|
||||
description = t.get("description")
|
||||
text = f"[{pillar_id}] {title}\n{description}\nstatus={status}"
|
||||
metadata = {
|
||||
"type": "daily_workflow_task",
|
||||
"date": date,
|
||||
"label": label,
|
||||
"pillar_id": pillar_id,
|
||||
"status": status,
|
||||
"implemented": status == "completed",
|
||||
"dismissed": status == "skipped",
|
||||
"task_id": task_id,
|
||||
}
|
||||
items.append((f"{label}_task:{user_id}:{date}:{task_id}", text, metadata))
|
||||
"""Index tasks to SIF in background without blocking the main API response."""
|
||||
try:
|
||||
svc = TxtaiIntelligenceService(user_id)
|
||||
items = []
|
||||
for t in tasks:
|
||||
task_id = t.get("id")
|
||||
pillar_id = t.get("pillarId")
|
||||
status = t.get("status")
|
||||
title = t.get("title")
|
||||
description = t.get("description")
|
||||
text = f"[{pillar_id}] {title}\n{description}\nstatus={status}"
|
||||
metadata = {
|
||||
"type": "daily_workflow_task",
|
||||
"date": date,
|
||||
"label": label,
|
||||
"pillar_id": pillar_id,
|
||||
"status": status,
|
||||
"implemented": status == "completed",
|
||||
"dismissed": status == "skipped",
|
||||
"task_id": task_id,
|
||||
}
|
||||
items.append((f"{label}_task:{user_id}:{date}:{task_id}", text, metadata))
|
||||
|
||||
# Index content without blocking - service will initialize in background if needed
|
||||
await svc.index_content(items)
|
||||
except Exception:
|
||||
return
|
||||
except Exception as e:
|
||||
# Log but don't raise - indexing failures shouldn't crash the API
|
||||
logger.debug(f"Background indexing failed for user {user_id}: {e}")
|
||||
|
||||
|
||||
@router.get("")
|
||||
|
||||
@@ -7,6 +7,8 @@ Enhanced with intelligent caching for performance optimization.
|
||||
|
||||
import os
|
||||
import traceback
|
||||
import asyncio
|
||||
import threading
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
from loguru import logger
|
||||
from datetime import datetime
|
||||
@@ -26,10 +28,15 @@ except ImportError:
|
||||
|
||||
class TxtaiIntelligenceService:
|
||||
_instances = {}
|
||||
_init_locks = {} # Locks for thread-safe initialization
|
||||
_init_tasks = {} # Track ongoing initialization tasks
|
||||
|
||||
def __new__(cls, user_id: str, *args, **kwargs):
|
||||
if user_id not in cls._instances:
|
||||
cls._instances[user_id] = super(TxtaiIntelligenceService, cls).__new__(cls)
|
||||
# Create a lock for this user's initialization
|
||||
if user_id not in cls._init_locks:
|
||||
cls._init_locks[user_id] = asyncio.Lock()
|
||||
return cls._instances[user_id]
|
||||
|
||||
def __init__(self, user_id: str, model_path: Optional[str] = None, enable_caching: bool = True):
|
||||
@@ -42,6 +49,7 @@ class TxtaiIntelligenceService:
|
||||
self.index_path = f"workspace/workspace_{user_id}/indices/txtai"
|
||||
self.embeddings = None
|
||||
self._initialized = False
|
||||
self._initialization_in_progress = False
|
||||
self.enable_caching = enable_caching
|
||||
self.cache_manager = semantic_cache_manager if enable_caching else None
|
||||
self._backend = "faiss" # Default backend
|
||||
@@ -54,9 +62,40 @@ class TxtaiIntelligenceService:
|
||||
# self._initialize_embeddings()
|
||||
|
||||
def _ensure_initialized(self):
|
||||
"""Lazy initialization helper."""
|
||||
if not self._initialized:
|
||||
self._initialize_embeddings()
|
||||
"""Lazy initialization helper - non-blocking version for API calls."""
|
||||
if self._initialized:
|
||||
# Already initialized, no-op
|
||||
return
|
||||
|
||||
if self._initialization_in_progress:
|
||||
# Initialization already triggered, skip to avoid blocking
|
||||
logger.debug(f"Initialization already in progress for user {self.user_id}, skipping redundant call")
|
||||
return
|
||||
|
||||
# Mark as in progress and initialize in background thread
|
||||
self._initialization_in_progress = True
|
||||
thread = threading.Thread(target=self._initialize_embeddings, daemon=True)
|
||||
thread.start()
|
||||
logger.debug(f"Background initialization started for user {self.user_id}")
|
||||
|
||||
async def _ensure_initialized_async(self):
|
||||
"""Async initialization helper - waits for initialization to complete."""
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# Ensure we have a lock for this user
|
||||
if self.user_id not in self.__class__._init_locks:
|
||||
self.__class__._init_locks[self.user_id] = asyncio.Lock()
|
||||
|
||||
# Use a lock to prevent concurrent initialization attempts
|
||||
async with self.__class__._init_locks[self.user_id]:
|
||||
# Double-check after acquiring lock
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# Run initialization in thread pool to avoid blocking event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(None, self._initialize_embeddings)
|
||||
|
||||
def _initialize_embeddings(self, load_existing_index: bool = True):
|
||||
"""Initialize txtai embeddings with local storage support and comprehensive error handling."""
|
||||
@@ -182,14 +221,26 @@ class TxtaiIntelligenceService:
|
||||
|
||||
async def index_content(self, items: List[Tuple[str, str, Dict[str, Any]]]):
|
||||
"""
|
||||
Index content for semantic search and clustering.
|
||||
Index content for semantic search and clustering (non-blocking).
|
||||
|
||||
Args:
|
||||
items: List of (id, text, metadata) tuples.
|
||||
"""
|
||||
self._ensure_initialized()
|
||||
if not self._initialized or not self.embeddings:
|
||||
logger.error(f"Cannot index content - service not initialized for user {self.user_id}")
|
||||
# Check if already initialized
|
||||
if not self._initialized and not self._initialization_in_progress:
|
||||
# Trigger initialization in background (non-blocking)
|
||||
self._ensure_initialized()
|
||||
# Don't wait for initialization - let it happen in background
|
||||
logger.debug(f"Indexing triggered for user {self.user_id}, initialization will happen in background")
|
||||
return
|
||||
|
||||
# If initialization is still in progress, log and return without blocking
|
||||
if not self._initialized:
|
||||
logger.warning(f"Service not yet initialized for user {self.user_id}, indexing will retry later")
|
||||
return
|
||||
|
||||
if not self.embeddings:
|
||||
logger.error(f"Cannot index content - embeddings not available for user {self.user_id}")
|
||||
return
|
||||
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user