Stabilize txtai nprobe handling without dropping loaded index state
This commit is contained in:
@@ -45,6 +45,7 @@ class TxtaiIntelligenceService:
|
||||
self.enable_caching = enable_caching
|
||||
self.cache_manager = semantic_cache_manager if enable_caching else None
|
||||
self._backend = "faiss" # Default backend
|
||||
self._disable_ann_queries = False # Set when FAISS nprobe incompatibility is detected
|
||||
|
||||
# Mark as initialized for singleton pattern
|
||||
self._singleton_initialized = True
|
||||
@@ -127,6 +128,7 @@ class TxtaiIntelligenceService:
|
||||
f"(backend={self._backend}, load_existing_index={load_existing_index})"
|
||||
)
|
||||
|
||||
self._disable_ann_queries = False
|
||||
self._initialized = True
|
||||
logger.info(f"Txtai Intelligence Service initialized successfully for user {self.user_id}")
|
||||
|
||||
@@ -145,13 +147,38 @@ class TxtaiIntelligenceService:
|
||||
message = str(error)
|
||||
return "nprobe" in message and "IndexIDMap" in message
|
||||
|
||||
def _switch_to_numpy_backend(self, load_existing_index: bool = False):
|
||||
"""Switch embedding backend to numpy and reinitialize service state."""
|
||||
if self._backend != "numpy":
|
||||
logger.warning(f"Switching txtai backend to numpy for user {self.user_id}")
|
||||
self._backend = "numpy"
|
||||
self._initialized = False
|
||||
self._initialize_embeddings(load_existing_index=load_existing_index)
|
||||
def _mark_ann_incompatible(self):
|
||||
"""Disable ANN-dependent code paths after FAISS nprobe incompatibility is observed."""
|
||||
if not self._disable_ann_queries:
|
||||
logger.warning(
|
||||
f"Disabling ANN-dependent txtai queries for user {self.user_id} due to IndexIDMap/nprobe incompatibility"
|
||||
)
|
||||
self._disable_ann_queries = True
|
||||
|
||||
def _search_with_ann_fallback(self, query: str, limit: int, graph: bool = False):
|
||||
"""Run search with ANN when available, then fall back to scan search when needed."""
|
||||
try:
|
||||
if self._disable_ann_queries:
|
||||
return self.embeddings.search(query, limit=limit, graph=graph, index=False)
|
||||
return self.embeddings.search(query, limit=limit, graph=graph)
|
||||
except AttributeError as ae:
|
||||
if not self._is_nprobe_incompatibility(ae):
|
||||
raise ae
|
||||
|
||||
self._mark_ann_incompatible()
|
||||
return self.embeddings.search(query, limit=limit, graph=graph, index=False)
|
||||
|
||||
@staticmethod
|
||||
def _cosine_similarity_from_vectors(v1, v2) -> float:
|
||||
"""Compute cosine similarity for two embedding vectors."""
|
||||
import math
|
||||
|
||||
dot_product = sum(a * b for a, b in zip(v1, v2))
|
||||
norm_v1 = math.sqrt(sum(a * a for a in v1))
|
||||
norm_v2 = math.sqrt(sum(b * b for b in v2))
|
||||
if norm_v1 == 0 or norm_v2 == 0:
|
||||
return 0.0
|
||||
return dot_product / (norm_v1 * norm_v2)
|
||||
|
||||
async def index_content(self, items: List[Tuple[str, str, Dict[str, Any]]]):
|
||||
"""
|
||||
@@ -227,29 +254,7 @@ class TxtaiIntelligenceService:
|
||||
logger.debug(f"Cache miss for search query: '{query}'")
|
||||
|
||||
logger.debug(f"Searching for query: '{query}' with limit: {limit}")
|
||||
try:
|
||||
results = self.embeddings.search(query, limit=limit)
|
||||
except AttributeError as ae:
|
||||
if self._is_nprobe_incompatibility(ae):
|
||||
logger.error(f"Detected known txtai/faiss IndexIDMap/nprobe incompatibility for user {self.user_id}. Attempting re-init with numpy backend fallback...")
|
||||
# Switch to numpy backend and skip loading persisted ANN index
|
||||
self._switch_to_numpy_backend(load_existing_index=False)
|
||||
if self.embeddings:
|
||||
try:
|
||||
results = self.embeddings.search(query, limit=limit)
|
||||
except AttributeError as retry_ae:
|
||||
if self._is_nprobe_incompatibility(retry_ae):
|
||||
logger.warning(
|
||||
f"Retry still hit nprobe incompatibility for user {self.user_id}; "
|
||||
f"forcing non-ANN search path."
|
||||
)
|
||||
results = self.embeddings.search(query, limit=limit, index=False)
|
||||
else:
|
||||
raise retry_ae
|
||||
else:
|
||||
raise ae
|
||||
else:
|
||||
raise ae
|
||||
results = self._search_with_ann_fallback(query, limit=limit)
|
||||
|
||||
# Cache the results if caching is enabled
|
||||
if self.enable_caching and self.cache_manager and results:
|
||||
@@ -293,45 +298,27 @@ class TxtaiIntelligenceService:
|
||||
logger.debug(f"Cache miss for similarity calculation")
|
||||
|
||||
logger.debug(f"Calculating similarity between texts: '{text1[:50]}...' and '{text2[:50]}...'")
|
||||
try:
|
||||
similarity = self.embeddings.similarity(text1, text2)
|
||||
except AttributeError as ae:
|
||||
if self._is_nprobe_incompatibility(ae):
|
||||
logger.error(f"Detected IndexIDMap nprobe error in similarity for user {self.user_id}. Falling back to numpy backend...")
|
||||
self._switch_to_numpy_backend(load_existing_index=False)
|
||||
if self.embeddings:
|
||||
try:
|
||||
similarity = self.embeddings.similarity(text1, text2)
|
||||
except AttributeError as retry_ae:
|
||||
if self._is_nprobe_incompatibility(retry_ae):
|
||||
logger.warning(
|
||||
f"Similarity retry still hit nprobe incompatibility for user {self.user_id}; "
|
||||
f"using vector cosine fallback."
|
||||
)
|
||||
vectors = self.embeddings.transform([text1, text2])
|
||||
if vectors is None or len(vectors) < 2:
|
||||
return 0.0
|
||||
|
||||
try:
|
||||
# Handle list or numpy array vectors consistently
|
||||
import math
|
||||
v1, v2 = vectors[0], vectors[1]
|
||||
dot_product = sum(a * b for a, b in zip(v1, v2))
|
||||
norm_v1 = math.sqrt(sum(a * a for a in v1))
|
||||
norm_v2 = math.sqrt(sum(b * b for b in v2))
|
||||
if norm_v1 == 0 or norm_v2 == 0:
|
||||
similarity = 0.0
|
||||
else:
|
||||
similarity = dot_product / (norm_v1 * norm_v2)
|
||||
except Exception as vector_error:
|
||||
logger.error(f"Cosine fallback failed for user {self.user_id}: {vector_error}")
|
||||
similarity = 0.0
|
||||
else:
|
||||
raise retry_ae
|
||||
if self._disable_ann_queries:
|
||||
vectors = self.embeddings.transform([text1, text2])
|
||||
if vectors is None or len(vectors) < 2:
|
||||
return 0.0
|
||||
similarity = self._cosine_similarity_from_vectors(vectors[0], vectors[1])
|
||||
else:
|
||||
try:
|
||||
similarity = self.embeddings.similarity(text1, text2)
|
||||
except AttributeError as ae:
|
||||
if self._is_nprobe_incompatibility(ae):
|
||||
logger.error(
|
||||
f"Detected IndexIDMap nprobe error in similarity for user {self.user_id}. "
|
||||
f"Using vector cosine fallback."
|
||||
)
|
||||
self._mark_ann_incompatible()
|
||||
vectors = self.embeddings.transform([text1, text2])
|
||||
if vectors is None or len(vectors) < 2:
|
||||
return 0.0
|
||||
similarity = self._cosine_similarity_from_vectors(vectors[0], vectors[1])
|
||||
else:
|
||||
raise ae
|
||||
else:
|
||||
raise ae
|
||||
|
||||
# Cache the similarity result
|
||||
if self.enable_caching and self.cache_manager:
|
||||
@@ -387,20 +374,7 @@ class TxtaiIntelligenceService:
|
||||
# Use graph-based clustering if available
|
||||
# Perform a search to get graph structure
|
||||
sample_query = "content marketing digital strategy"
|
||||
try:
|
||||
graph_results = self.embeddings.search(sample_query, limit=10, graph=True)
|
||||
except AttributeError as ae:
|
||||
if self._is_nprobe_incompatibility(ae):
|
||||
logger.error(f"Detected IndexIDMap nprobe error in cluster for user {self.user_id}. Falling back to numpy backend...")
|
||||
# Force re-initialization with numpy backend to bypass FAISS issue
|
||||
self._switch_to_numpy_backend(load_existing_index=False)
|
||||
if self.embeddings:
|
||||
# Retry with numpy backend (no graph support, so fallback)
|
||||
return await self._fallback_clustering(min_score)
|
||||
else:
|
||||
raise ae
|
||||
else:
|
||||
raise ae
|
||||
graph_results = self._search_with_ann_fallback(sample_query, limit=10, graph=True)
|
||||
|
||||
if not graph_results:
|
||||
logger.warning(f"No graph results for clustering user {self.user_id}")
|
||||
|
||||
Reference in New Issue
Block a user