Make SIF agent workflows non-blocking and guard SSE hangs

This commit is contained in:
ي
2026-03-10 14:05:00 +05:30
parent bc49329ed6
commit 3a88d09af8
4 changed files with 82 additions and 58 deletions

View File

@@ -158,6 +158,16 @@ class SIFBaseAgent(BaseALwrityAgent):
if kwargs:
logger.debug(f"[{self.__class__.__name__}] Parameters: {kwargs}")
async def _ensure_intelligence_ready(self) -> bool:
"""Ensure txtai intelligence service is initialized without blocking the event loop."""
try:
await self.intelligence._ensure_initialized_async()
except Exception as init_err:
logger.warning(f"[{self.__class__.__name__}] Intelligence initialization failed: {init_err}")
return False
return bool(getattr(self.intelligence, "_initialized", False) and self.intelligence.embeddings)
def _create_txtai_agent(self):
"""
SIF agents primarily use the intelligence service directly, but we can expose
@@ -186,11 +196,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
self._log_agent_operation("Discovering content pillars")
try:
# Check if intelligence service is initialized
if not self.intelligence.is_initialized():
logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized")
return []
# Let intelligence service perform lazy async initialization internally.
clusters = await self.intelligence.cluster(min_score=0.6)
if not clusters:
@@ -370,14 +376,14 @@ class StrategyArchitectAgent(SIFBaseAgent):
async def _fetch_index_documents(self) -> List[Dict[str, Any]]:
"""Fetch indexed documents and normalize metadata from txtai result objects."""
if not self.intelligence.is_initialized() or not self.intelligence.embeddings:
if not await self._ensure_intelligence_ready():
return []
embeddings = self.intelligence.embeddings
limit = 0
if hasattr(embeddings, "count"):
try:
limit = int(embeddings.count())
limit = int(await asyncio.to_thread(embeddings.count))
except Exception:
limit = 0
@@ -394,7 +400,7 @@ class StrategyArchitectAgent(SIFBaseAgent):
for query in candidate_queries:
try:
query_limit = limit if query.startswith("select") and limit > 0 else max(10, limit or 50)
rows = embeddings.search(query, limit=query_limit)
rows = await asyncio.to_thread(lambda: embeddings.search(query, limit=query_limit))
except Exception:
continue
@@ -565,7 +571,7 @@ class ContentGuardianAgent(SIFBaseAgent):
self._log_agent_operation("Checking for semantic cannibalization", draft_length=len(new_draft))
try:
if not self.intelligence.is_initialized():
if not await self._ensure_intelligence_ready():
logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized")
return {"warning": False, "error": "Service not initialized"}
@@ -796,7 +802,7 @@ class LinkGraphAgent(SIFBaseAgent):
self._log_agent_operation("Suggesting internal links", draft_length=len(draft))
try:
if not self.intelligence.is_initialized():
if not await self._ensure_intelligence_ready():
logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized")
return []
@@ -876,7 +882,7 @@ class LinkGraphAgent(SIFBaseAgent):
self._log_agent_operation("Building semantic link graph")
try:
if not self.intelligence.is_initialized():
if not await self._ensure_intelligence_ready():
return {"error": "Intelligence service not initialized"}
# This is a resource-intensive operation in a real vector DB.
@@ -1002,7 +1008,7 @@ class CitationExpert(SIFBaseAgent):
self._log_agent_operation("Finding citations", topic=topic)
try:
if not self.intelligence.is_initialized():
if not await self._ensure_intelligence_ready():
return []
# Search for highly relevant content

View File

@@ -222,32 +222,15 @@ class TxtaiIntelligenceService:
async def index_content(self, items: List[Tuple[str, str, Dict[str, Any]]]):
"""
Index content for semantic search and clustering (non-blocking).
Index content for semantic search and clustering.
Args:
items: List of (id, text, metadata) tuples.
"""
<<<<<<< HEAD
# Check if already initialized
if not self._initialized and not self._initialization_in_progress:
# Trigger initialization in background (non-blocking)
self._ensure_initialized()
# Don't wait for initialization - let it happen in background
logger.debug(f"Indexing triggered for user {self.user_id}, initialization will happen in background")
return
# If initialization is still in progress, log and return without blocking
if not self._initialized:
logger.warning(f"Service not yet initialized for user {self.user_id}, indexing will retry later")
return
if not self.embeddings:
logger.error(f"Cannot index content - embeddings not available for user {self.user_id}")
=======
self._ensure_initialized()
await self._ensure_initialized_async()
if not self._initialized or not self.embeddings:
message = f"Cannot index content - service not initialized for user {self.user_id}"
logger.error(message)
logger.warning(message)
if self.fail_fast:
raise RuntimeError(message)
return
@@ -255,12 +238,12 @@ class TxtaiIntelligenceService:
try:
logger.info(f"Starting content indexing for user {self.user_id}")
logger.debug(f"Indexing {len(items)} items")
# Validate input items
if not items:
logger.warning("No items provided for indexing")
return
# Index items: [(id, text, metadata)] - metadata needs to be JSON string for txtai
import json
processed_items = []
@@ -269,19 +252,19 @@ class TxtaiIntelligenceService:
# Convert metadata dict to JSON string
metadata_json = json.dumps(metadata) if metadata else "{}"
processed_items.append((id_val, text, metadata_json))
self.embeddings.index(processed_items)
# Save the index
self.embeddings.save(self.index_path)
logger.info(f"Successfully indexed {len(items)} items for user {self.user_id}")
logger.debug(f"Index saved to: {self.index_path}")
except Exception as e:
logger.error(f"Error indexing content for user {self.user_id}: {e}")
logger.error(f"Full traceback: {traceback.format_exc()}")
logger.error(f"Items count: {len(items) if items else 0}")
message = str(e)
is_windows_lock_error = isinstance(e, PermissionError) or "WinError 32" in message
if is_windows_lock_error:
@@ -294,7 +277,7 @@ class TxtaiIntelligenceService:
async def search(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Perform semantic search with intelligent caching."""
self._ensure_initialized()
await self._ensure_initialized_async()
if not self._initialized or not self.embeddings:
message = f"Cannot perform search - service not initialized for user {self.user_id}"
logger.error(message)
@@ -341,7 +324,7 @@ class TxtaiIntelligenceService:
async def get_similarity(self, text1: str, text2: str) -> float:
"""Get semantic similarity between two texts with caching."""
self._ensure_initialized()
await self._ensure_initialized_async()
if not self._initialized or not self.embeddings:
logger.error(f"Cannot calculate similarity - service not initialized for user {self.user_id}")
return 0.0
@@ -410,7 +393,7 @@ class TxtaiIntelligenceService:
async def cluster(self, min_score: float = 0.5) -> List[List[int]]:
"""Cluster indexed content to find semantic pillars using graph-based clustering with caching."""
self._ensure_initialized()
await self._ensure_initialized_async()
if not self._initialized or not self.embeddings:
logger.error(f"Cannot cluster content - service not initialized for user {self.user_id}")
return []
@@ -536,7 +519,7 @@ class TxtaiIntelligenceService:
async def classify(self, text: str, labels: List[str]) -> List[Tuple[str, float]]:
"""Classify text using zero-shot classification."""
self._ensure_initialized()
await self._ensure_initialized_async()
if not self._initialized or not Labels:
logger.error(f"Cannot classify text - service not initialized or Labels not available for user {self.user_id}")
return []