From 952824a271529b4545a90684c084e504fb2407f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Mon, 9 Mar 2026 12:27:16 +0530 Subject: [PATCH] Stabilize txtai nprobe handling without dropping loaded index state --- .../services/intelligence/txtai_service.py | 136 +++++++----------- 1 file changed, 55 insertions(+), 81 deletions(-) diff --git a/backend/services/intelligence/txtai_service.py b/backend/services/intelligence/txtai_service.py index e1b910d7..59caec9d 100644 --- a/backend/services/intelligence/txtai_service.py +++ b/backend/services/intelligence/txtai_service.py @@ -45,6 +45,7 @@ class TxtaiIntelligenceService: self.enable_caching = enable_caching self.cache_manager = semantic_cache_manager if enable_caching else None self._backend = "faiss" # Default backend + self._disable_ann_queries = False # Set when FAISS nprobe incompatibility is detected # Mark as initialized for singleton pattern self._singleton_initialized = True @@ -127,6 +128,7 @@ class TxtaiIntelligenceService: f"(backend={self._backend}, load_existing_index={load_existing_index})" ) + self._disable_ann_queries = False self._initialized = True logger.info(f"Txtai Intelligence Service initialized successfully for user {self.user_id}") @@ -145,13 +147,38 @@ class TxtaiIntelligenceService: message = str(error) return "nprobe" in message and "IndexIDMap" in message - def _switch_to_numpy_backend(self, load_existing_index: bool = False): - """Switch embedding backend to numpy and reinitialize service state.""" - if self._backend != "numpy": - logger.warning(f"Switching txtai backend to numpy for user {self.user_id}") - self._backend = "numpy" - self._initialized = False - self._initialize_embeddings(load_existing_index=load_existing_index) + def _mark_ann_incompatible(self): + """Disable ANN-dependent code paths after FAISS nprobe incompatibility is observed.""" + if not self._disable_ann_queries: + logger.warning( + f"Disabling ANN-dependent txtai queries for user {self.user_id} due to IndexIDMap/nprobe incompatibility" + ) + self._disable_ann_queries = True + + def _search_with_ann_fallback(self, query: str, limit: int, graph: bool = False): + """Run search with ANN when available, then fall back to scan search when needed.""" + try: + if self._disable_ann_queries: + return self.embeddings.search(query, limit=limit, graph=graph, index=False) + return self.embeddings.search(query, limit=limit, graph=graph) + except AttributeError as ae: + if not self._is_nprobe_incompatibility(ae): + raise ae + + self._mark_ann_incompatible() + return self.embeddings.search(query, limit=limit, graph=graph, index=False) + + @staticmethod + def _cosine_similarity_from_vectors(v1, v2) -> float: + """Compute cosine similarity for two embedding vectors.""" + import math + + dot_product = sum(a * b for a, b in zip(v1, v2)) + norm_v1 = math.sqrt(sum(a * a for a in v1)) + norm_v2 = math.sqrt(sum(b * b for b in v2)) + if norm_v1 == 0 or norm_v2 == 0: + return 0.0 + return dot_product / (norm_v1 * norm_v2) async def index_content(self, items: List[Tuple[str, str, Dict[str, Any]]]): """ @@ -227,29 +254,7 @@ class TxtaiIntelligenceService: logger.debug(f"Cache miss for search query: '{query}'") logger.debug(f"Searching for query: '{query}' with limit: {limit}") - try: - results = self.embeddings.search(query, limit=limit) - except AttributeError as ae: - if self._is_nprobe_incompatibility(ae): - logger.error(f"Detected known txtai/faiss IndexIDMap/nprobe incompatibility for user {self.user_id}. Attempting re-init with numpy backend fallback...") - # Switch to numpy backend and skip loading persisted ANN index - self._switch_to_numpy_backend(load_existing_index=False) - if self.embeddings: - try: - results = self.embeddings.search(query, limit=limit) - except AttributeError as retry_ae: - if self._is_nprobe_incompatibility(retry_ae): - logger.warning( - f"Retry still hit nprobe incompatibility for user {self.user_id}; " - f"forcing non-ANN search path." - ) - results = self.embeddings.search(query, limit=limit, index=False) - else: - raise retry_ae - else: - raise ae - else: - raise ae + results = self._search_with_ann_fallback(query, limit=limit) # Cache the results if caching is enabled if self.enable_caching and self.cache_manager and results: @@ -293,45 +298,27 @@ class TxtaiIntelligenceService: logger.debug(f"Cache miss for similarity calculation") logger.debug(f"Calculating similarity between texts: '{text1[:50]}...' and '{text2[:50]}...'") - try: - similarity = self.embeddings.similarity(text1, text2) - except AttributeError as ae: - if self._is_nprobe_incompatibility(ae): - logger.error(f"Detected IndexIDMap nprobe error in similarity for user {self.user_id}. Falling back to numpy backend...") - self._switch_to_numpy_backend(load_existing_index=False) - if self.embeddings: - try: - similarity = self.embeddings.similarity(text1, text2) - except AttributeError as retry_ae: - if self._is_nprobe_incompatibility(retry_ae): - logger.warning( - f"Similarity retry still hit nprobe incompatibility for user {self.user_id}; " - f"using vector cosine fallback." - ) - vectors = self.embeddings.transform([text1, text2]) - if vectors is None or len(vectors) < 2: - return 0.0 - - try: - # Handle list or numpy array vectors consistently - import math - v1, v2 = vectors[0], vectors[1] - dot_product = sum(a * b for a, b in zip(v1, v2)) - norm_v1 = math.sqrt(sum(a * a for a in v1)) - norm_v2 = math.sqrt(sum(b * b for b in v2)) - if norm_v1 == 0 or norm_v2 == 0: - similarity = 0.0 - else: - similarity = dot_product / (norm_v1 * norm_v2) - except Exception as vector_error: - logger.error(f"Cosine fallback failed for user {self.user_id}: {vector_error}") - similarity = 0.0 - else: - raise retry_ae + if self._disable_ann_queries: + vectors = self.embeddings.transform([text1, text2]) + if vectors is None or len(vectors) < 2: + return 0.0 + similarity = self._cosine_similarity_from_vectors(vectors[0], vectors[1]) + else: + try: + similarity = self.embeddings.similarity(text1, text2) + except AttributeError as ae: + if self._is_nprobe_incompatibility(ae): + logger.error( + f"Detected IndexIDMap nprobe error in similarity for user {self.user_id}. " + f"Using vector cosine fallback." + ) + self._mark_ann_incompatible() + vectors = self.embeddings.transform([text1, text2]) + if vectors is None or len(vectors) < 2: + return 0.0 + similarity = self._cosine_similarity_from_vectors(vectors[0], vectors[1]) else: raise ae - else: - raise ae # Cache the similarity result if self.enable_caching and self.cache_manager: @@ -387,20 +374,7 @@ class TxtaiIntelligenceService: # Use graph-based clustering if available # Perform a search to get graph structure sample_query = "content marketing digital strategy" - try: - graph_results = self.embeddings.search(sample_query, limit=10, graph=True) - except AttributeError as ae: - if self._is_nprobe_incompatibility(ae): - logger.error(f"Detected IndexIDMap nprobe error in cluster for user {self.user_id}. Falling back to numpy backend...") - # Force re-initialization with numpy backend to bypass FAISS issue - self._switch_to_numpy_backend(load_existing_index=False) - if self.embeddings: - # Retry with numpy backend (no graph support, so fallback) - return await self._fallback_clustering(min_score) - else: - raise ae - else: - raise ae + graph_results = self._search_with_ann_fallback(sample_query, limit=10, graph=True) if not graph_results: logger.warning(f"No graph results for clustering user {self.user_id}")