Fix txtai nprobe fallback to avoid reloading incompatible faiss index
This commit is contained in:
@@ -57,7 +57,7 @@ class TxtaiIntelligenceService:
|
|||||||
if not self._initialized:
|
if not self._initialized:
|
||||||
self._initialize_embeddings()
|
self._initialize_embeddings()
|
||||||
|
|
||||||
def _initialize_embeddings(self):
|
def _initialize_embeddings(self, load_existing_index: bool = True):
|
||||||
"""Initialize txtai embeddings with local storage support and comprehensive error handling."""
|
"""Initialize txtai embeddings with local storage support and comprehensive error handling."""
|
||||||
if not TXTAI_AVAILABLE:
|
if not TXTAI_AVAILABLE:
|
||||||
logger.error("txtai is not available. Please install with: pip install txtai[pipeline,similarity]")
|
logger.error("txtai is not available. Please install with: pip install txtai[pipeline,similarity]")
|
||||||
@@ -96,7 +96,7 @@ class TxtaiIntelligenceService:
|
|||||||
logger.info("Embeddings instance created successfully")
|
logger.info("Embeddings instance created successfully")
|
||||||
|
|
||||||
# Check if existing index exists and load it
|
# Check if existing index exists and load it
|
||||||
if os.path.exists(self.index_path):
|
if load_existing_index and os.path.exists(self.index_path):
|
||||||
logger.info(f"Loading existing txtai index from {self.index_path}")
|
logger.info(f"Loading existing txtai index from {self.index_path}")
|
||||||
try:
|
try:
|
||||||
self.embeddings.load(self.index_path)
|
self.embeddings.load(self.index_path)
|
||||||
@@ -119,8 +119,13 @@ class TxtaiIntelligenceService:
|
|||||||
"gpu": False,
|
"gpu": False,
|
||||||
"limit": 1000
|
"limit": 1000
|
||||||
})
|
})
|
||||||
else:
|
elif load_existing_index:
|
||||||
logger.info(f"No existing index found. Creating new txtai index for user {self.user_id}")
|
logger.info(f"No existing index found. Creating new txtai index for user {self.user_id}")
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
f"Skipping existing txtai index load for user {self.user_id} "
|
||||||
|
f"(backend={self._backend}, load_existing_index={load_existing_index})"
|
||||||
|
)
|
||||||
|
|
||||||
self._initialized = True
|
self._initialized = True
|
||||||
logger.info(f"Txtai Intelligence Service initialized successfully for user {self.user_id}")
|
logger.info(f"Txtai Intelligence Service initialized successfully for user {self.user_id}")
|
||||||
@@ -134,6 +139,20 @@ class TxtaiIntelligenceService:
|
|||||||
logger.error("3. Missing dependencies - try: pip install txtai[pipeline,similarity]")
|
logger.error("3. Missing dependencies - try: pip install txtai[pipeline,similarity]")
|
||||||
self._initialized = False
|
self._initialized = False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_nprobe_incompatibility(error: Exception) -> bool:
|
||||||
|
"""Detect known FAISS IndexIDMap/nprobe incompatibility."""
|
||||||
|
message = str(error)
|
||||||
|
return "nprobe" in message and "IndexIDMap" in message
|
||||||
|
|
||||||
|
def _switch_to_numpy_backend(self, load_existing_index: bool = False):
|
||||||
|
"""Switch embedding backend to numpy and reinitialize service state."""
|
||||||
|
if self._backend != "numpy":
|
||||||
|
logger.warning(f"Switching txtai backend to numpy for user {self.user_id}")
|
||||||
|
self._backend = "numpy"
|
||||||
|
self._initialized = False
|
||||||
|
self._initialize_embeddings(load_existing_index=load_existing_index)
|
||||||
|
|
||||||
async def index_content(self, items: List[Tuple[str, str, Dict[str, Any]]]):
|
async def index_content(self, items: List[Tuple[str, str, Dict[str, Any]]]):
|
||||||
"""
|
"""
|
||||||
Index content for semantic search and clustering.
|
Index content for semantic search and clustering.
|
||||||
@@ -211,14 +230,22 @@ class TxtaiIntelligenceService:
|
|||||||
try:
|
try:
|
||||||
results = self.embeddings.search(query, limit=limit)
|
results = self.embeddings.search(query, limit=limit)
|
||||||
except AttributeError as ae:
|
except AttributeError as ae:
|
||||||
if "nprobe" in str(ae):
|
if self._is_nprobe_incompatibility(ae):
|
||||||
logger.error(f"Detected known txtai/faiss IndexIDMap/nprobe incompatibility for user {self.user_id}. Attempting re-init with numpy backend fallback...")
|
logger.error(f"Detected known txtai/faiss IndexIDMap/nprobe incompatibility for user {self.user_id}. Attempting re-init with numpy backend fallback...")
|
||||||
# Switch to numpy backend which doesn't have this issue
|
# Switch to numpy backend and skip loading persisted ANN index
|
||||||
self._backend = "numpy"
|
self._switch_to_numpy_backend(load_existing_index=False)
|
||||||
self._initialized = False
|
|
||||||
self._initialize_embeddings()
|
|
||||||
if self.embeddings:
|
if self.embeddings:
|
||||||
|
try:
|
||||||
results = self.embeddings.search(query, limit=limit)
|
results = self.embeddings.search(query, limit=limit)
|
||||||
|
except AttributeError as retry_ae:
|
||||||
|
if self._is_nprobe_incompatibility(retry_ae):
|
||||||
|
logger.warning(
|
||||||
|
f"Retry still hit nprobe incompatibility for user {self.user_id}; "
|
||||||
|
f"forcing non-ANN search path."
|
||||||
|
)
|
||||||
|
results = self.embeddings.search(query, limit=limit, index=False)
|
||||||
|
else:
|
||||||
|
raise retry_ae
|
||||||
else:
|
else:
|
||||||
raise ae
|
raise ae
|
||||||
else:
|
else:
|
||||||
@@ -269,14 +296,38 @@ class TxtaiIntelligenceService:
|
|||||||
try:
|
try:
|
||||||
similarity = self.embeddings.similarity(text1, text2)
|
similarity = self.embeddings.similarity(text1, text2)
|
||||||
except AttributeError as ae:
|
except AttributeError as ae:
|
||||||
if "nprobe" in str(ae):
|
if self._is_nprobe_incompatibility(ae):
|
||||||
logger.error(f"Detected IndexIDMap nprobe error in similarity for user {self.user_id}. Falling back to numpy backend...")
|
logger.error(f"Detected IndexIDMap nprobe error in similarity for user {self.user_id}. Falling back to numpy backend...")
|
||||||
# Switch to numpy backend which doesn't have this issue
|
self._switch_to_numpy_backend(load_existing_index=False)
|
||||||
self._backend = "numpy"
|
|
||||||
self._initialized = False
|
|
||||||
self._initialize_embeddings()
|
|
||||||
if self.embeddings:
|
if self.embeddings:
|
||||||
|
try:
|
||||||
similarity = self.embeddings.similarity(text1, text2)
|
similarity = self.embeddings.similarity(text1, text2)
|
||||||
|
except AttributeError as retry_ae:
|
||||||
|
if self._is_nprobe_incompatibility(retry_ae):
|
||||||
|
logger.warning(
|
||||||
|
f"Similarity retry still hit nprobe incompatibility for user {self.user_id}; "
|
||||||
|
f"using vector cosine fallback."
|
||||||
|
)
|
||||||
|
vectors = self.embeddings.transform([text1, text2])
|
||||||
|
if vectors is None or len(vectors) < 2:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Handle list or numpy array vectors consistently
|
||||||
|
import math
|
||||||
|
v1, v2 = vectors[0], vectors[1]
|
||||||
|
dot_product = sum(a * b for a, b in zip(v1, v2))
|
||||||
|
norm_v1 = math.sqrt(sum(a * a for a in v1))
|
||||||
|
norm_v2 = math.sqrt(sum(b * b for b in v2))
|
||||||
|
if norm_v1 == 0 or norm_v2 == 0:
|
||||||
|
similarity = 0.0
|
||||||
|
else:
|
||||||
|
similarity = dot_product / (norm_v1 * norm_v2)
|
||||||
|
except Exception as vector_error:
|
||||||
|
logger.error(f"Cosine fallback failed for user {self.user_id}: {vector_error}")
|
||||||
|
similarity = 0.0
|
||||||
|
else:
|
||||||
|
raise retry_ae
|
||||||
else:
|
else:
|
||||||
raise ae
|
raise ae
|
||||||
else:
|
else:
|
||||||
@@ -339,12 +390,10 @@ class TxtaiIntelligenceService:
|
|||||||
try:
|
try:
|
||||||
graph_results = self.embeddings.search(sample_query, limit=10, graph=True)
|
graph_results = self.embeddings.search(sample_query, limit=10, graph=True)
|
||||||
except AttributeError as ae:
|
except AttributeError as ae:
|
||||||
if "nprobe" in str(ae):
|
if self._is_nprobe_incompatibility(ae):
|
||||||
logger.error(f"Detected IndexIDMap nprobe error in cluster for user {self.user_id}. Falling back to numpy backend...")
|
logger.error(f"Detected IndexIDMap nprobe error in cluster for user {self.user_id}. Falling back to numpy backend...")
|
||||||
# Force re-initialization with numpy backend to bypass FAISS issue
|
# Force re-initialization with numpy backend to bypass FAISS issue
|
||||||
self._backend = "numpy"
|
self._switch_to_numpy_backend(load_existing_index=False)
|
||||||
self._initialized = False
|
|
||||||
self._initialize_embeddings()
|
|
||||||
if self.embeddings:
|
if self.embeddings:
|
||||||
# Retry with numpy backend (no graph support, so fallback)
|
# Retry with numpy backend (no graph support, so fallback)
|
||||||
return await self._fallback_clustering(min_score)
|
return await self._fallback_clustering(min_score)
|
||||||
|
|||||||
Reference in New Issue
Block a user