From cd8582eb8c8f57b6d9d906403180085ae9cd0850 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Mon, 9 Mar 2026 12:21:43 +0530 Subject: [PATCH] Fix txtai nprobe fallback to avoid reloading incompatible faiss index --- .../services/intelligence/txtai_service.py | 87 +++++++++++++++---- 1 file changed, 68 insertions(+), 19 deletions(-) diff --git a/backend/services/intelligence/txtai_service.py b/backend/services/intelligence/txtai_service.py index 430a844b..e1b910d7 100644 --- a/backend/services/intelligence/txtai_service.py +++ b/backend/services/intelligence/txtai_service.py @@ -57,7 +57,7 @@ class TxtaiIntelligenceService: if not self._initialized: self._initialize_embeddings() - def _initialize_embeddings(self): + def _initialize_embeddings(self, load_existing_index: bool = True): """Initialize txtai embeddings with local storage support and comprehensive error handling.""" if not TXTAI_AVAILABLE: logger.error("txtai is not available. Please install with: pip install txtai[pipeline,similarity]") @@ -96,7 +96,7 @@ class TxtaiIntelligenceService: logger.info("Embeddings instance created successfully") # Check if existing index exists and load it - if os.path.exists(self.index_path): + if load_existing_index and os.path.exists(self.index_path): logger.info(f"Loading existing txtai index from {self.index_path}") try: self.embeddings.load(self.index_path) @@ -119,8 +119,13 @@ class TxtaiIntelligenceService: "gpu": False, "limit": 1000 }) - else: + elif load_existing_index: logger.info(f"No existing index found. Creating new txtai index for user {self.user_id}") + else: + logger.info( + f"Skipping existing txtai index load for user {self.user_id} " + f"(backend={self._backend}, load_existing_index={load_existing_index})" + ) self._initialized = True logger.info(f"Txtai Intelligence Service initialized successfully for user {self.user_id}") @@ -134,6 +139,20 @@ class TxtaiIntelligenceService: logger.error("3. Missing dependencies - try: pip install txtai[pipeline,similarity]") self._initialized = False + @staticmethod + def _is_nprobe_incompatibility(error: Exception) -> bool: + """Detect known FAISS IndexIDMap/nprobe incompatibility.""" + message = str(error) + return "nprobe" in message and "IndexIDMap" in message + + def _switch_to_numpy_backend(self, load_existing_index: bool = False): + """Switch embedding backend to numpy and reinitialize service state.""" + if self._backend != "numpy": + logger.warning(f"Switching txtai backend to numpy for user {self.user_id}") + self._backend = "numpy" + self._initialized = False + self._initialize_embeddings(load_existing_index=load_existing_index) + async def index_content(self, items: List[Tuple[str, str, Dict[str, Any]]]): """ Index content for semantic search and clustering. @@ -211,14 +230,22 @@ class TxtaiIntelligenceService: try: results = self.embeddings.search(query, limit=limit) except AttributeError as ae: - if "nprobe" in str(ae): + if self._is_nprobe_incompatibility(ae): logger.error(f"Detected known txtai/faiss IndexIDMap/nprobe incompatibility for user {self.user_id}. Attempting re-init with numpy backend fallback...") - # Switch to numpy backend which doesn't have this issue - self._backend = "numpy" - self._initialized = False - self._initialize_embeddings() + # Switch to numpy backend and skip loading persisted ANN index + self._switch_to_numpy_backend(load_existing_index=False) if self.embeddings: - results = self.embeddings.search(query, limit=limit) + try: + results = self.embeddings.search(query, limit=limit) + except AttributeError as retry_ae: + if self._is_nprobe_incompatibility(retry_ae): + logger.warning( + f"Retry still hit nprobe incompatibility for user {self.user_id}; " + f"forcing non-ANN search path." + ) + results = self.embeddings.search(query, limit=limit, index=False) + else: + raise retry_ae else: raise ae else: @@ -269,14 +296,38 @@ class TxtaiIntelligenceService: try: similarity = self.embeddings.similarity(text1, text2) except AttributeError as ae: - if "nprobe" in str(ae): + if self._is_nprobe_incompatibility(ae): logger.error(f"Detected IndexIDMap nprobe error in similarity for user {self.user_id}. Falling back to numpy backend...") - # Switch to numpy backend which doesn't have this issue - self._backend = "numpy" - self._initialized = False - self._initialize_embeddings() + self._switch_to_numpy_backend(load_existing_index=False) if self.embeddings: - similarity = self.embeddings.similarity(text1, text2) + try: + similarity = self.embeddings.similarity(text1, text2) + except AttributeError as retry_ae: + if self._is_nprobe_incompatibility(retry_ae): + logger.warning( + f"Similarity retry still hit nprobe incompatibility for user {self.user_id}; " + f"using vector cosine fallback." + ) + vectors = self.embeddings.transform([text1, text2]) + if vectors is None or len(vectors) < 2: + return 0.0 + + try: + # Handle list or numpy array vectors consistently + import math + v1, v2 = vectors[0], vectors[1] + dot_product = sum(a * b for a, b in zip(v1, v2)) + norm_v1 = math.sqrt(sum(a * a for a in v1)) + norm_v2 = math.sqrt(sum(b * b for b in v2)) + if norm_v1 == 0 or norm_v2 == 0: + similarity = 0.0 + else: + similarity = dot_product / (norm_v1 * norm_v2) + except Exception as vector_error: + logger.error(f"Cosine fallback failed for user {self.user_id}: {vector_error}") + similarity = 0.0 + else: + raise retry_ae else: raise ae else: @@ -339,12 +390,10 @@ class TxtaiIntelligenceService: try: graph_results = self.embeddings.search(sample_query, limit=10, graph=True) except AttributeError as ae: - if "nprobe" in str(ae): + if self._is_nprobe_incompatibility(ae): logger.error(f"Detected IndexIDMap nprobe error in cluster for user {self.user_id}. Falling back to numpy backend...") # Force re-initialization with numpy backend to bypass FAISS issue - self._backend = "numpy" - self._initialized = False - self._initialize_embeddings() + self._switch_to_numpy_backend(load_existing_index=False) if self.embeddings: # Retry with numpy backend (no graph support, so fallback) return await self._fallback_clustering(min_score)