From 3a88d09af87525614098a00f4da997a4a84f03e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Tue, 10 Mar 2026 14:05:00 +0530 Subject: [PATCH] Make SIF agent workflows non-blocking and guard SSE hangs --- backend/services/intelligence/sif_agents.py | 30 ++++++++----- .../services/intelligence/txtai_service.py | 45 ++++++------------- .../llm_providers/huggingface_provider.py | 40 ++++++++++++----- .../services/contentPlanningOrchestrator.ts | 25 +++++++++-- 4 files changed, 82 insertions(+), 58 deletions(-) diff --git a/backend/services/intelligence/sif_agents.py b/backend/services/intelligence/sif_agents.py index 52464915..a81bd0cf 100644 --- a/backend/services/intelligence/sif_agents.py +++ b/backend/services/intelligence/sif_agents.py @@ -158,6 +158,16 @@ class SIFBaseAgent(BaseALwrityAgent): if kwargs: logger.debug(f"[{self.__class__.__name__}] Parameters: {kwargs}") + async def _ensure_intelligence_ready(self) -> bool: + """Ensure txtai intelligence service is initialized without blocking the event loop.""" + try: + await self.intelligence._ensure_initialized_async() + except Exception as init_err: + logger.warning(f"[{self.__class__.__name__}] Intelligence initialization failed: {init_err}") + return False + + return bool(getattr(self.intelligence, "_initialized", False) and self.intelligence.embeddings) + def _create_txtai_agent(self): """ SIF agents primarily use the intelligence service directly, but we can expose @@ -186,11 +196,7 @@ class StrategyArchitectAgent(SIFBaseAgent): self._log_agent_operation("Discovering content pillars") try: - # Check if intelligence service is initialized - if not self.intelligence.is_initialized(): - logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized") - return [] - + # Let intelligence service perform lazy async initialization internally. clusters = await self.intelligence.cluster(min_score=0.6) if not clusters: @@ -370,14 +376,14 @@ class StrategyArchitectAgent(SIFBaseAgent): async def _fetch_index_documents(self) -> List[Dict[str, Any]]: """Fetch indexed documents and normalize metadata from txtai result objects.""" - if not self.intelligence.is_initialized() or not self.intelligence.embeddings: + if not await self._ensure_intelligence_ready(): return [] embeddings = self.intelligence.embeddings limit = 0 if hasattr(embeddings, "count"): try: - limit = int(embeddings.count()) + limit = int(await asyncio.to_thread(embeddings.count)) except Exception: limit = 0 @@ -394,7 +400,7 @@ class StrategyArchitectAgent(SIFBaseAgent): for query in candidate_queries: try: query_limit = limit if query.startswith("select") and limit > 0 else max(10, limit or 50) - rows = embeddings.search(query, limit=query_limit) + rows = await asyncio.to_thread(lambda: embeddings.search(query, limit=query_limit)) except Exception: continue @@ -565,7 +571,7 @@ class ContentGuardianAgent(SIFBaseAgent): self._log_agent_operation("Checking for semantic cannibalization", draft_length=len(new_draft)) try: - if not self.intelligence.is_initialized(): + if not await self._ensure_intelligence_ready(): logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized") return {"warning": False, "error": "Service not initialized"} @@ -796,7 +802,7 @@ class LinkGraphAgent(SIFBaseAgent): self._log_agent_operation("Suggesting internal links", draft_length=len(draft)) try: - if not self.intelligence.is_initialized(): + if not await self._ensure_intelligence_ready(): logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized") return [] @@ -876,7 +882,7 @@ class LinkGraphAgent(SIFBaseAgent): self._log_agent_operation("Building semantic link graph") try: - if not self.intelligence.is_initialized(): + if not await self._ensure_intelligence_ready(): return {"error": "Intelligence service not initialized"} # This is a resource-intensive operation in a real vector DB. @@ -1002,7 +1008,7 @@ class CitationExpert(SIFBaseAgent): self._log_agent_operation("Finding citations", topic=topic) try: - if not self.intelligence.is_initialized(): + if not await self._ensure_intelligence_ready(): return [] # Search for highly relevant content diff --git a/backend/services/intelligence/txtai_service.py b/backend/services/intelligence/txtai_service.py index 802e0778..37584b38 100644 --- a/backend/services/intelligence/txtai_service.py +++ b/backend/services/intelligence/txtai_service.py @@ -222,32 +222,15 @@ class TxtaiIntelligenceService: async def index_content(self, items: List[Tuple[str, str, Dict[str, Any]]]): """ - Index content for semantic search and clustering (non-blocking). - + Index content for semantic search and clustering. + Args: items: List of (id, text, metadata) tuples. """ -<<<<<<< HEAD - # Check if already initialized - if not self._initialized and not self._initialization_in_progress: - # Trigger initialization in background (non-blocking) - self._ensure_initialized() - # Don't wait for initialization - let it happen in background - logger.debug(f"Indexing triggered for user {self.user_id}, initialization will happen in background") - return - - # If initialization is still in progress, log and return without blocking - if not self._initialized: - logger.warning(f"Service not yet initialized for user {self.user_id}, indexing will retry later") - return - - if not self.embeddings: - logger.error(f"Cannot index content - embeddings not available for user {self.user_id}") -======= - self._ensure_initialized() + await self._ensure_initialized_async() if not self._initialized or not self.embeddings: message = f"Cannot index content - service not initialized for user {self.user_id}" - logger.error(message) + logger.warning(message) if self.fail_fast: raise RuntimeError(message) return @@ -255,12 +238,12 @@ class TxtaiIntelligenceService: try: logger.info(f"Starting content indexing for user {self.user_id}") logger.debug(f"Indexing {len(items)} items") - + # Validate input items if not items: logger.warning("No items provided for indexing") return - + # Index items: [(id, text, metadata)] - metadata needs to be JSON string for txtai import json processed_items = [] @@ -269,19 +252,19 @@ class TxtaiIntelligenceService: # Convert metadata dict to JSON string metadata_json = json.dumps(metadata) if metadata else "{}" processed_items.append((id_val, text, metadata_json)) - + self.embeddings.index(processed_items) - + # Save the index self.embeddings.save(self.index_path) logger.info(f"Successfully indexed {len(items)} items for user {self.user_id}") logger.debug(f"Index saved to: {self.index_path}") - + except Exception as e: logger.error(f"Error indexing content for user {self.user_id}: {e}") logger.error(f"Full traceback: {traceback.format_exc()}") logger.error(f"Items count: {len(items) if items else 0}") - + message = str(e) is_windows_lock_error = isinstance(e, PermissionError) or "WinError 32" in message if is_windows_lock_error: @@ -294,7 +277,7 @@ class TxtaiIntelligenceService: async def search(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: """Perform semantic search with intelligent caching.""" - self._ensure_initialized() + await self._ensure_initialized_async() if not self._initialized or not self.embeddings: message = f"Cannot perform search - service not initialized for user {self.user_id}" logger.error(message) @@ -341,7 +324,7 @@ class TxtaiIntelligenceService: async def get_similarity(self, text1: str, text2: str) -> float: """Get semantic similarity between two texts with caching.""" - self._ensure_initialized() + await self._ensure_initialized_async() if not self._initialized or not self.embeddings: logger.error(f"Cannot calculate similarity - service not initialized for user {self.user_id}") return 0.0 @@ -410,7 +393,7 @@ class TxtaiIntelligenceService: async def cluster(self, min_score: float = 0.5) -> List[List[int]]: """Cluster indexed content to find semantic pillars using graph-based clustering with caching.""" - self._ensure_initialized() + await self._ensure_initialized_async() if not self._initialized or not self.embeddings: logger.error(f"Cannot cluster content - service not initialized for user {self.user_id}") return [] @@ -536,7 +519,7 @@ class TxtaiIntelligenceService: async def classify(self, text: str, labels: List[str]) -> List[Tuple[str, float]]: """Classify text using zero-shot classification.""" - self._ensure_initialized() + await self._ensure_initialized_async() if not self._initialized or not Labels: logger.error(f"Cannot classify text - service not initialized or Labels not available for user {self.user_id}") return [] diff --git a/backend/services/llm_providers/huggingface_provider.py b/backend/services/llm_providers/huggingface_provider.py index 5f354ca7..e1b3c762 100644 --- a/backend/services/llm_providers/huggingface_provider.py +++ b/backend/services/llm_providers/huggingface_provider.py @@ -97,13 +97,29 @@ HF_FALLBACK_MODELS = [ ] +def _candidate_model_variants(model: str): + """Yield model ids to try for a single logical model preference.""" + if not model: + return + + # Try configured model first (supports provider suffixes like ":groq") + yield model + + # Fallback to base repo id when provider suffix is not recognized by the router + if ":" in model: + base_model = model.split(":", 1)[0] + if base_model: + yield base_model + + def _fallback_model_sequence(model: str): sequence = [model] + HF_FALLBACK_MODELS seen = set() - for candidate in sequence: - if candidate and candidate not in seen: - seen.add(candidate) - yield candidate + for preferred_model in sequence: + for candidate in _candidate_model_variants(preferred_model): + if candidate and candidate not in seen: + seen.add(candidate) + yield candidate def get_huggingface_api_key() -> str: """Get Hugging Face API key with proper error handling.""" @@ -201,7 +217,7 @@ def huggingface_text_response( # Add debugging for API call logger.info( - "Hugging Face text call | model=%s | prompt_len=%s | temp=%s | top_p=%s | max_tokens=%s", + "Hugging Face text call | model={} | prompt_len={} | temp={} | top_p={} | max_tokens={}", model, len(prompt) if isinstance(prompt, str) else '', temperature, @@ -227,11 +243,11 @@ def huggingface_text_response( max_tokens=max_tokens ) if candidate_model != model: - logger.warning("HF text generation switched to fallback model: %s", candidate_model) + logger.warning("HF text generation switched to fallback model: {}", candidate_model) break except NotFoundError as nf_err: last_error = nf_err - logger.warning("HF model not found: %s. Trying fallback model.", candidate_model) + logger.warning("HF model not found: {}. Trying fallback model.", candidate_model) continue if response is None: @@ -347,7 +363,7 @@ def huggingface_structured_json_response( # Add debugging for API call logger.info( - "Hugging Face structured call | model=%s | prompt_len=%s | schema_kind=%s | temp=%s | max_tokens=%s", + "Hugging Face structured call | model={} | prompt_len={} | schema_kind={} | temp={} | max_tokens={}", model, len(prompt) if isinstance(prompt, str) else '', type(schema).__name__, @@ -381,11 +397,11 @@ def huggingface_structured_json_response( response_format={"type": "json_object"} # Try to enforce JSON mode if supported ) if candidate_model != model: - logger.warning("HF structured generation switched to fallback model: %s", candidate_model) + logger.warning("HF structured generation switched to fallback model: {}", candidate_model) break except NotFoundError as nf_err: last_error = nf_err - logger.warning("HF structured model not found: %s. Trying fallback model.", candidate_model) + logger.warning("HF structured model not found: {}. Trying fallback model.", candidate_model) continue if response is None: @@ -437,11 +453,11 @@ def huggingface_structured_json_response( max_tokens=max_tokens ) if candidate_model != model: - logger.warning("HF structured no-response_format fallback model: %s", candidate_model) + logger.warning("HF structured no-response_format fallback model: {}", candidate_model) break except NotFoundError as nf_err: last_error = nf_err - logger.warning("HF structured model not found (no response_format path): %s", candidate_model) + logger.warning("HF structured model not found (no response_format path): {}", candidate_model) continue if response is None: diff --git a/frontend/src/services/contentPlanningOrchestrator.ts b/frontend/src/services/contentPlanningOrchestrator.ts index 684b0376..0a2aae4c 100644 --- a/frontend/src/services/contentPlanningOrchestrator.ts +++ b/frontend/src/services/contentPlanningOrchestrator.ts @@ -231,6 +231,22 @@ export class ContentPlanningOrchestrator { // New approach: stream strategic intelligence data and show status from AI generation SSE return await new Promise<{ aiInsights: any[]; aiRecommendations: any[] }>(async (resolve) => { + let finished = false; + const complete = (payload: { aiInsights: any[]; aiRecommendations: any[] }) => { + if (finished) return; + finished = true; + resolve(payload); + }; + + // Hard timeout so the orchestrator never hangs if SSE never emits terminal events. + const hardTimeout = window.setTimeout(() => { + this.updateServiceStatus('aiAnalytics', { + status: 'error', + progress: 0, + message: 'Strategic intelligence timed out' + }); + complete({ aiInsights: [], aiRecommendations: [] }); + }, 45000); // 1) Execution status stream (best-effort; ignore if no active strategy) try { const currentStrategyId = this.latestDashboardData?.strategies?.[0]?.id; @@ -280,18 +296,21 @@ export class ContentPlanningOrchestrator { }); // Map to orchestrator fields if needed this.notifyDataUpdate({ aiInsights: data.data?.recommendations || [], aiRecommendations: [] }); - resolve({ aiInsights: data.data?.recommendations || [], aiRecommendations: [] }); + window.clearTimeout(hardTimeout); + complete({ aiInsights: data.data?.recommendations || [], aiRecommendations: [] }); } else if (data.type === 'error') { this.updateServiceStatus('aiAnalytics', { status: 'error', progress: 0, message: data.message || 'Failed to load strategic intelligence' }); - resolve({ aiInsights: [], aiRecommendations: [] }); + window.clearTimeout(hardTimeout); + complete({ aiInsights: [], aiRecommendations: [] }); } }, () => { - resolve({ aiInsights: [], aiRecommendations: [] }); + window.clearTimeout(hardTimeout); + complete({ aiInsights: [], aiRecommendations: [] }); } ); });