Merge branch 'pr-403'

This commit is contained in:
ajaysi
2026-03-09 16:05:18 +05:30

View File

@@ -45,6 +45,7 @@ class TxtaiIntelligenceService:
self.enable_caching = enable_caching
self.cache_manager = semantic_cache_manager if enable_caching else None
self._backend = "faiss" # Default backend
self._disable_ann_queries = False # Set when FAISS nprobe incompatibility is detected
# Mark as initialized for singleton pattern
self._singleton_initialized = True
@@ -57,7 +58,7 @@ class TxtaiIntelligenceService:
if not self._initialized:
self._initialize_embeddings()
def _initialize_embeddings(self):
def _initialize_embeddings(self, load_existing_index: bool = True):
"""Initialize txtai embeddings with local storage support and comprehensive error handling."""
if not TXTAI_AVAILABLE:
logger.error("txtai is not available. Please install with: pip install txtai[pipeline,similarity]")
@@ -96,7 +97,7 @@ class TxtaiIntelligenceService:
logger.info("Embeddings instance created successfully")
# Check if existing index exists and load it
if os.path.exists(self.index_path):
if load_existing_index and os.path.exists(self.index_path):
logger.info(f"Loading existing txtai index from {self.index_path}")
try:
self.embeddings.load(self.index_path)
@@ -119,9 +120,15 @@ class TxtaiIntelligenceService:
"gpu": False,
"limit": 1000
})
else:
elif load_existing_index:
logger.info(f"No existing index found. Creating new txtai index for user {self.user_id}")
else:
logger.info(
f"Skipping existing txtai index load for user {self.user_id} "
f"(backend={self._backend}, load_existing_index={load_existing_index})"
)
self._disable_ann_queries = False
self._initialized = True
logger.info(f"Txtai Intelligence Service initialized successfully for user {self.user_id}")
@@ -134,6 +141,45 @@ class TxtaiIntelligenceService:
logger.error("3. Missing dependencies - try: pip install txtai[pipeline,similarity]")
self._initialized = False
@staticmethod
def _is_nprobe_incompatibility(error: Exception) -> bool:
"""Detect known FAISS IndexIDMap/nprobe incompatibility."""
message = str(error)
return "nprobe" in message and "IndexIDMap" in message
def _mark_ann_incompatible(self):
"""Disable ANN-dependent code paths after FAISS nprobe incompatibility is observed."""
if not self._disable_ann_queries:
logger.warning(
f"Disabling ANN-dependent txtai queries for user {self.user_id} due to IndexIDMap/nprobe incompatibility"
)
self._disable_ann_queries = True
def _search_with_ann_fallback(self, query: str, limit: int, graph: bool = False):
"""Run search with ANN when available, then fall back to scan search when needed."""
try:
if self._disable_ann_queries:
return self.embeddings.search(query, limit=limit, graph=graph, index=False)
return self.embeddings.search(query, limit=limit, graph=graph)
except AttributeError as ae:
if not self._is_nprobe_incompatibility(ae):
raise ae
self._mark_ann_incompatible()
return self.embeddings.search(query, limit=limit, graph=graph, index=False)
@staticmethod
def _cosine_similarity_from_vectors(v1, v2) -> float:
"""Compute cosine similarity for two embedding vectors."""
import math
dot_product = sum(a * b for a, b in zip(v1, v2))
norm_v1 = math.sqrt(sum(a * a for a in v1))
norm_v2 = math.sqrt(sum(b * b for b in v2))
if norm_v1 == 0 or norm_v2 == 0:
return 0.0
return dot_product / (norm_v1 * norm_v2)
async def index_content(self, items: List[Tuple[str, str, Dict[str, Any]]]):
"""
Index content for semantic search and clustering.
@@ -208,21 +254,7 @@ class TxtaiIntelligenceService:
logger.debug(f"Cache miss for search query: '{query}'")
logger.debug(f"Searching for query: '{query}' with limit: {limit}")
try:
results = self.embeddings.search(query, limit=limit)
except AttributeError as ae:
if "nprobe" in str(ae):
logger.error(f"Detected known txtai/faiss IndexIDMap/nprobe incompatibility for user {self.user_id}. Attempting re-init with numpy backend fallback...")
# Switch to numpy backend which doesn't have this issue
self._backend = "numpy"
self._initialized = False
self._initialize_embeddings()
if self.embeddings:
results = self.embeddings.search(query, limit=limit)
else:
raise ae
else:
raise ae
results = self._search_with_ann_fallback(query, limit=limit)
# Cache the results if caching is enabled
if self.enable_caching and self.cache_manager and results:
@@ -266,21 +298,27 @@ class TxtaiIntelligenceService:
logger.debug(f"Cache miss for similarity calculation")
logger.debug(f"Calculating similarity between texts: '{text1[:50]}...' and '{text2[:50]}...'")
try:
similarity = self.embeddings.similarity(text1, text2)
except AttributeError as ae:
if "nprobe" in str(ae):
logger.error(f"Detected IndexIDMap nprobe error in similarity for user {self.user_id}. Falling back to numpy backend...")
# Switch to numpy backend which doesn't have this issue
self._backend = "numpy"
self._initialized = False
self._initialize_embeddings()
if self.embeddings:
similarity = self.embeddings.similarity(text1, text2)
if self._disable_ann_queries:
vectors = self.embeddings.transform([text1, text2])
if vectors is None or len(vectors) < 2:
return 0.0
similarity = self._cosine_similarity_from_vectors(vectors[0], vectors[1])
else:
try:
similarity = self.embeddings.similarity(text1, text2)
except AttributeError as ae:
if self._is_nprobe_incompatibility(ae):
logger.error(
f"Detected IndexIDMap nprobe error in similarity for user {self.user_id}. "
f"Using vector cosine fallback."
)
self._mark_ann_incompatible()
vectors = self.embeddings.transform([text1, text2])
if vectors is None or len(vectors) < 2:
return 0.0
similarity = self._cosine_similarity_from_vectors(vectors[0], vectors[1])
else:
raise ae
else:
raise ae
# Cache the similarity result
if self.enable_caching and self.cache_manager:
@@ -336,22 +374,7 @@ class TxtaiIntelligenceService:
# Use graph-based clustering if available
# Perform a search to get graph structure
sample_query = "content marketing digital strategy"
try:
graph_results = self.embeddings.search(sample_query, limit=10, graph=True)
except AttributeError as ae:
if "nprobe" in str(ae):
logger.error(f"Detected IndexIDMap nprobe error in cluster for user {self.user_id}. Falling back to numpy backend...")
# Force re-initialization with numpy backend to bypass FAISS issue
self._backend = "numpy"
self._initialized = False
self._initialize_embeddings()
if self.embeddings:
# Retry with numpy backend (no graph support, so fallback)
return await self._fallback_clustering(min_score)
else:
raise ae
else:
raise ae
graph_results = self._search_with_ann_fallback(sample_query, limit=10, graph=True)
if not graph_results:
logger.warning(f"No graph results for clustering user {self.user_id}")