diff --git a/backend/services/intelligence/txtai_service.py b/backend/services/intelligence/txtai_service.py index 430a844b..59caec9d 100644 --- a/backend/services/intelligence/txtai_service.py +++ b/backend/services/intelligence/txtai_service.py @@ -45,6 +45,7 @@ class TxtaiIntelligenceService: self.enable_caching = enable_caching self.cache_manager = semantic_cache_manager if enable_caching else None self._backend = "faiss" # Default backend + self._disable_ann_queries = False # Set when FAISS nprobe incompatibility is detected # Mark as initialized for singleton pattern self._singleton_initialized = True @@ -57,7 +58,7 @@ class TxtaiIntelligenceService: if not self._initialized: self._initialize_embeddings() - def _initialize_embeddings(self): + def _initialize_embeddings(self, load_existing_index: bool = True): """Initialize txtai embeddings with local storage support and comprehensive error handling.""" if not TXTAI_AVAILABLE: logger.error("txtai is not available. Please install with: pip install txtai[pipeline,similarity]") @@ -96,7 +97,7 @@ class TxtaiIntelligenceService: logger.info("Embeddings instance created successfully") # Check if existing index exists and load it - if os.path.exists(self.index_path): + if load_existing_index and os.path.exists(self.index_path): logger.info(f"Loading existing txtai index from {self.index_path}") try: self.embeddings.load(self.index_path) @@ -119,9 +120,15 @@ class TxtaiIntelligenceService: "gpu": False, "limit": 1000 }) - else: + elif load_existing_index: logger.info(f"No existing index found. Creating new txtai index for user {self.user_id}") + else: + logger.info( + f"Skipping existing txtai index load for user {self.user_id} " + f"(backend={self._backend}, load_existing_index={load_existing_index})" + ) + self._disable_ann_queries = False self._initialized = True logger.info(f"Txtai Intelligence Service initialized successfully for user {self.user_id}") @@ -134,6 +141,45 @@ class TxtaiIntelligenceService: logger.error("3. Missing dependencies - try: pip install txtai[pipeline,similarity]") self._initialized = False + @staticmethod + def _is_nprobe_incompatibility(error: Exception) -> bool: + """Detect known FAISS IndexIDMap/nprobe incompatibility.""" + message = str(error) + return "nprobe" in message and "IndexIDMap" in message + + def _mark_ann_incompatible(self): + """Disable ANN-dependent code paths after FAISS nprobe incompatibility is observed.""" + if not self._disable_ann_queries: + logger.warning( + f"Disabling ANN-dependent txtai queries for user {self.user_id} due to IndexIDMap/nprobe incompatibility" + ) + self._disable_ann_queries = True + + def _search_with_ann_fallback(self, query: str, limit: int, graph: bool = False): + """Run search with ANN when available, then fall back to scan search when needed.""" + try: + if self._disable_ann_queries: + return self.embeddings.search(query, limit=limit, graph=graph, index=False) + return self.embeddings.search(query, limit=limit, graph=graph) + except AttributeError as ae: + if not self._is_nprobe_incompatibility(ae): + raise ae + + self._mark_ann_incompatible() + return self.embeddings.search(query, limit=limit, graph=graph, index=False) + + @staticmethod + def _cosine_similarity_from_vectors(v1, v2) -> float: + """Compute cosine similarity for two embedding vectors.""" + import math + + dot_product = sum(a * b for a, b in zip(v1, v2)) + norm_v1 = math.sqrt(sum(a * a for a in v1)) + norm_v2 = math.sqrt(sum(b * b for b in v2)) + if norm_v1 == 0 or norm_v2 == 0: + return 0.0 + return dot_product / (norm_v1 * norm_v2) + async def index_content(self, items: List[Tuple[str, str, Dict[str, Any]]]): """ Index content for semantic search and clustering. @@ -208,21 +254,7 @@ class TxtaiIntelligenceService: logger.debug(f"Cache miss for search query: '{query}'") logger.debug(f"Searching for query: '{query}' with limit: {limit}") - try: - results = self.embeddings.search(query, limit=limit) - except AttributeError as ae: - if "nprobe" in str(ae): - logger.error(f"Detected known txtai/faiss IndexIDMap/nprobe incompatibility for user {self.user_id}. Attempting re-init with numpy backend fallback...") - # Switch to numpy backend which doesn't have this issue - self._backend = "numpy" - self._initialized = False - self._initialize_embeddings() - if self.embeddings: - results = self.embeddings.search(query, limit=limit) - else: - raise ae - else: - raise ae + results = self._search_with_ann_fallback(query, limit=limit) # Cache the results if caching is enabled if self.enable_caching and self.cache_manager and results: @@ -266,21 +298,27 @@ class TxtaiIntelligenceService: logger.debug(f"Cache miss for similarity calculation") logger.debug(f"Calculating similarity between texts: '{text1[:50]}...' and '{text2[:50]}...'") - try: - similarity = self.embeddings.similarity(text1, text2) - except AttributeError as ae: - if "nprobe" in str(ae): - logger.error(f"Detected IndexIDMap nprobe error in similarity for user {self.user_id}. Falling back to numpy backend...") - # Switch to numpy backend which doesn't have this issue - self._backend = "numpy" - self._initialized = False - self._initialize_embeddings() - if self.embeddings: - similarity = self.embeddings.similarity(text1, text2) + if self._disable_ann_queries: + vectors = self.embeddings.transform([text1, text2]) + if vectors is None or len(vectors) < 2: + return 0.0 + similarity = self._cosine_similarity_from_vectors(vectors[0], vectors[1]) + else: + try: + similarity = self.embeddings.similarity(text1, text2) + except AttributeError as ae: + if self._is_nprobe_incompatibility(ae): + logger.error( + f"Detected IndexIDMap nprobe error in similarity for user {self.user_id}. " + f"Using vector cosine fallback." + ) + self._mark_ann_incompatible() + vectors = self.embeddings.transform([text1, text2]) + if vectors is None or len(vectors) < 2: + return 0.0 + similarity = self._cosine_similarity_from_vectors(vectors[0], vectors[1]) else: raise ae - else: - raise ae # Cache the similarity result if self.enable_caching and self.cache_manager: @@ -336,22 +374,7 @@ class TxtaiIntelligenceService: # Use graph-based clustering if available # Perform a search to get graph structure sample_query = "content marketing digital strategy" - try: - graph_results = self.embeddings.search(sample_query, limit=10, graph=True) - except AttributeError as ae: - if "nprobe" in str(ae): - logger.error(f"Detected IndexIDMap nprobe error in cluster for user {self.user_id}. Falling back to numpy backend...") - # Force re-initialization with numpy backend to bypass FAISS issue - self._backend = "numpy" - self._initialized = False - self._initialize_embeddings() - if self.embeddings: - # Retry with numpy backend (no graph support, so fallback) - return await self._fallback_clustering(min_score) - else: - raise ae - else: - raise ae + graph_results = self._search_with_ann_fallback(sample_query, limit=10, graph=True) if not graph_results: logger.warning(f"No graph results for clustering user {self.user_id}")