Make SIF fail fast and add low-cost remote LLM fallback

This commit is contained in:
ي
2026-03-09 15:38:03 +05:30
committed by ajaysi
parent 651bd2b5f0
commit 4230385e70
7 changed files with 224 additions and 66 deletions

View File

@@ -47,7 +47,7 @@ logger = get_service_logger(__name__)
class AgentTeamConfiguration: class AgentTeamConfiguration:
"""Configuration for the complete agent team""" """Configuration for the complete agent team"""
user_id: str user_id: str
shared_llm: str = "Qwen/Qwen2.5-3B-Instruct" # Updated to a stable model known for text-generation shared_llm: str = "Qwen/Qwen2.5-1.5B-Instruct" # Reduced default memory footprint for local environments
max_iterations: int = 15 max_iterations: int = 15
enable_safety: bool = True enable_safety: bool = True
enable_performance_monitoring: bool = True enable_performance_monitoring: bool = True

View File

@@ -40,10 +40,17 @@ from services.intelligence.monitoring.semantic_dashboard import RealTimeSemantic
from services.intelligence.agents.safety_framework import get_safety_framework from services.intelligence.agents.safety_framework import get_safety_framework
from services.agent_activity_service import AgentActivityService, build_agent_event_payload from services.agent_activity_service import AgentActivityService, build_agent_event_payload
from services.intelligence.agents.agent_usage_tracking import track_agent_usage_sync from services.intelligence.agents.agent_usage_tracking import track_agent_usage_sync
from services.llm_providers.main_text_generation import llm_text_gen
import time import time
logger = get_service_logger(__name__) logger = get_service_logger(__name__)
LOW_COST_REMOTE_MODELS = [
"Qwen/Qwen2.5-1.5B-Instruct",
"Qwen/Qwen2.5-0.5B-Instruct",
"TinyLlama/TinyLlama-1.1B-Chat-v1.0",
]
class TrackingLLMWrapper: class TrackingLLMWrapper:
""" """
Wrapper for LLM instances to transparently track usage. Wrapper for LLM instances to transparently track usage.
@@ -169,7 +176,7 @@ class BaseALwrityAgent(ABC):
_prompt_context_cache: Dict[str, Dict[str, Any]] = {} _prompt_context_cache: Dict[str, Dict[str, Any]] = {}
_profile_cache: Dict[str, Dict[str, Any]] = {} _profile_cache: Dict[str, Dict[str, Any]] = {}
def __init__(self, user_id: str, agent_type: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None, enable_tracing: bool = True, **kwargs): def __init__(self, user_id: str, agent_type: str, model_name: str = "Qwen/Qwen2.5-1.5B-Instruct", llm: Any = None, enable_tracing: bool = True, **kwargs):
self.user_id = user_id self.user_id = user_id
self.agent_type = agent_type self.agent_type = agent_type
self.model_name = model_name self.model_name = model_name
@@ -295,7 +302,8 @@ class BaseALwrityAgent(ABC):
Centralized method for all agents inheriting from BaseALwrityAgent. Centralized method for all agents inheriting from BaseALwrityAgent.
""" """
if not self.llm: if not self.llm:
return "[LLM Unavailable]" logger.error("LLM unavailable for agent %s (%s)", self.agent_type, self.agent_id)
raise RuntimeError(f"LLM unavailable for agent {self.agent_type}")
try: try:
# Run in executor to avoid blocking if LLM is synchronous # Run in executor to avoid blocking if LLM is synchronous
@@ -319,7 +327,37 @@ class BaseALwrityAgent(ABC):
except Exception as e: except Exception as e:
logger.error(f"LLM generation failed in agent {self.agent_type}: {e}") logger.error(f"LLM generation failed in agent {self.agent_type}: {e}")
return "[Generation Failed]" logger.warning(
"Attempting remote low-cost fallback via llm_text_gen for agent %s (user=%s)",
self.agent_type,
self.user_id,
)
try:
loop = asyncio.get_event_loop()
fallback_response = await loop.run_in_executor(
None,
lambda: llm_text_gen(
prompt=prompt,
user_id=self.user_id,
preferred_hf_models=LOW_COST_REMOTE_MODELS,
),
)
logger.warning(
"Remote low-cost fallback succeeded for agent %s (user=%s)",
self.agent_type,
self.user_id,
)
return fallback_response
except Exception as remote_e:
logger.error(
"Remote fallback failed for agent %s (user=%s): %s",
self.agent_type,
self.user_id,
remote_e,
)
raise RuntimeError(
f"Local and remote LLM generation failed for agent {self.agent_type}: {remote_e}"
) from remote_e
def _resolve_agent_key(self, agent_type: str) -> str: def _resolve_agent_key(self, agent_type: str) -> str:
value = str(agent_type or "").strip() value = str(agent_type or "").strip()
@@ -524,7 +562,7 @@ class BaseALwrityAgent(ABC):
result = await loop.run_in_executor(None, self.txtai_agent, prompt) result = await loop.run_in_executor(None, self.txtai_agent, prompt)
if not self.txtai_agent: if not self.txtai_agent:
result = "Agent not initialized" raise RuntimeError(f"Agent {self.agent_id} not initialized (txtai_agent missing)")
if activity and run_record: if activity and run_record:
activity.log_event( activity.log_event(
@@ -848,19 +886,15 @@ class BaseALwrityAgent(ABC):
raise e raise e
async def _execute_fallback(self, action: AgentAction) -> str: async def _execute_fallback(self, action: AgentAction) -> str:
"""Execute fallback action when txtai is not available""" """Fail-fast instead of returning mock fallback output."""
# Simulate agent processing for development logger.error(
logger.info(f"Executing fallback action: {action.action_type}") "Fallback execution requested for action '%s' on agent %s. Failing fast to avoid mock output.",
action.action_type,
# Return simulated result based on action type self.agent_id,
if action.action_type == "analyze_competitor": )
return "Competitor analysis completed (fallback mode)" raise RuntimeError(
elif action.action_type == "optimize_content": f"Fallback execution is disabled for SIF reliability. Agent={self.agent_id}, action={action.action_type}"
return "Content optimization completed (fallback mode)" )
elif action.action_type == "fix_seo_issue":
return "SEO issue fixed (fallback mode)"
else:
return f"Action {action.action_type} completed (fallback mode)"
def _prepare_agent_prompt(self, action: AgentAction) -> str: def _prepare_agent_prompt(self, action: AgentAction) -> str:
"""Prepare prompt for txtai agent""" """Prepare prompt for txtai agent"""

View File

@@ -29,7 +29,7 @@ except ImportError:
logger.warning("txtai not available, using fallback implementation") logger.warning("txtai not available, using fallback implementation")
class SIFBaseAgent(BaseALwrityAgent): class SIFBaseAgent(BaseALwrityAgent):
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, agent_type: str = "sif_agent", model_name: str = "Qwen/Qwen2.5-3B-Instruct", llm: Any = None, **kwargs): def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, agent_type: str = "sif_agent", model_name: str = "Qwen/Qwen2.5-1.5B-Instruct", llm: Any = None, **kwargs):
# Hybrid LLM Strategy: # Hybrid LLM Strategy:
# 1. Shared LLM for external/high-quality generation # 1. Shared LLM for external/high-quality generation
self.shared_llm = SharedLLMWrapper(user_id) self.shared_llm = SharedLLMWrapper(user_id)

View File

@@ -44,6 +44,12 @@ class SharedLLMWrapper:
_local_llm_cache = {} _local_llm_cache = {}
LOCAL_LLM_FALLBACKS = [
"Qwen/Qwen2.5-1.5B-Instruct",
"Qwen/Qwen2.5-0.5B-Instruct",
"TinyLlama/TinyLlama-1.1B-Chat-v1.0",
]
class LocalLLMWrapper: class LocalLLMWrapper:
""" """
Lazily loads a local LLM via txtai and caches it globally. Lazily loads a local LLM via txtai and caches it globally.
@@ -72,22 +78,56 @@ class LocalLLMWrapper:
if task_to_use == "text-generation": if task_to_use == "text-generation":
task_to_use = "language-generation" task_to_use = "language-generation"
logger.info(f"Loading local LLM (singleton): {self.model_path} (task={task_to_use})") candidate_models = []
try: for candidate in [self.model_path, *LOCAL_LLM_FALLBACKS]:
_local_llm_cache[cache_key] = LLM(path=self.model_path, task=task_to_use) if candidate not in candidate_models:
except Exception as e: candidate_models.append(candidate)
last_error = None
for candidate_model in candidate_models:
candidate_key = f"{candidate_model}:{self.task}"
if candidate_key in _local_llm_cache:
if candidate_model != self.model_path:
logger.warning(f"Using cached fallback local LLM model: {candidate_model}")
return _local_llm_cache[candidate_key]
logger.info(f"Loading local LLM (singleton): {candidate_model} (task={task_to_use})")
try: try:
import transformers _local_llm_cache[candidate_key] = LLM(path=candidate_model, task=task_to_use)
from transformers.pipelines import SUPPORTED_TASKS if candidate_model != self.model_path:
logger.error( logger.warning(
f"LocalLLMWrapper init failed (model={self.model_path}, requested_task={task_to_use}, " f"Loaded fallback local LLM model '{candidate_model}' after failure on '{self.model_path}'"
f"transformers={getattr(transformers, '__version__', 'unknown')}, " )
f"supported_tasks={sorted(list(SUPPORTED_TASKS.keys()))[:50]})" return _local_llm_cache[candidate_key]
except Exception as e:
last_error = e
message = str(e).lower()
is_memory_issue = (
"paging file is too small" in message
or "os error 1455" in message
or "out of memory" in message
or "not enough memory" in message
) )
except Exception: if is_memory_issue:
pass logger.warning(
logger.error(f"Failed to initialize LocalLLMWrapper: {e}") f"Local LLM memory load failure for '{candidate_model}', trying smaller fallback. Error: {e}"
raise e )
continue
logger.warning(f"Local LLM load failed for '{candidate_model}', trying next fallback. Error: {e}")
continue
try:
import transformers
from transformers.pipelines import SUPPORTED_TASKS
logger.error(
f"LocalLLMWrapper init failed (model={self.model_path}, requested_task={task_to_use}, "
f"transformers={getattr(transformers, '__version__', 'unknown')}, "
f"supported_tasks={sorted(list(SUPPORTED_TASKS.keys()))[:50]})"
)
except Exception:
pass
logger.error(f"Failed to initialize LocalLLMWrapper after fallback attempts: {last_error}")
raise last_error
return _local_llm_cache[cache_key] return _local_llm_cache[cache_key]
@@ -98,7 +138,7 @@ class LocalLLMWrapper:
return self.llm(prompt, **kwargs) return self.llm(prompt, **kwargs)
class SIFBaseAgent(BaseALwrityAgent): class SIFBaseAgent(BaseALwrityAgent):
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, agent_type: str = "sif_agent", model_name: str = "Qwen/Qwen2.5-3B-Instruct", llm: Any = None): def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, agent_type: str = "sif_agent", model_name: str = "Qwen/Qwen2.5-1.5B-Instruct", llm: Any = None):
# Hybrid LLM Strategy: # Hybrid LLM Strategy:
# 1. Shared LLM for external/high-quality generation (available to all agents) # 1. Shared LLM for external/high-quality generation (available to all agents)
self.shared_llm = SharedLLMWrapper(user_id) self.shared_llm = SharedLLMWrapper(user_id)

View File

@@ -54,6 +54,7 @@ class TxtaiIntelligenceService:
self.cache_manager = semantic_cache_manager if enable_caching else None self.cache_manager = semantic_cache_manager if enable_caching else None
self._backend = "faiss" # Default backend self._backend = "faiss" # Default backend
self._disable_ann_queries = False # Set when FAISS nprobe incompatibility is detected self._disable_ann_queries = False # Set when FAISS nprobe incompatibility is detected
self.fail_fast = str(os.getenv("SIF_FAIL_FAST", "true")).lower() in {"1", "true", "yes", "on"}
# Mark as initialized for singleton pattern # Mark as initialized for singleton pattern
self._singleton_initialized = True self._singleton_initialized = True
@@ -226,6 +227,7 @@ class TxtaiIntelligenceService:
Args: Args:
items: List of (id, text, metadata) tuples. items: List of (id, text, metadata) tuples.
""" """
<<<<<<< HEAD
# Check if already initialized # Check if already initialized
if not self._initialized and not self._initialization_in_progress: if not self._initialized and not self._initialization_in_progress:
# Trigger initialization in background (non-blocking) # Trigger initialization in background (non-blocking)
@@ -241,6 +243,14 @@ class TxtaiIntelligenceService:
if not self.embeddings: if not self.embeddings:
logger.error(f"Cannot index content - embeddings not available for user {self.user_id}") logger.error(f"Cannot index content - embeddings not available for user {self.user_id}")
=======
self._ensure_initialized()
if not self._initialized or not self.embeddings:
message = f"Cannot index content - service not initialized for user {self.user_id}"
logger.error(message)
if self.fail_fast:
raise RuntimeError(message)
>>>>>>> 8b0547c (Make SIF fail fast and add low-cost remote LLM fallback)
return return
try: try:
@@ -287,7 +297,10 @@ class TxtaiIntelligenceService:
"""Perform semantic search with intelligent caching.""" """Perform semantic search with intelligent caching."""
self._ensure_initialized() self._ensure_initialized()
if not self._initialized or not self.embeddings: if not self._initialized or not self.embeddings:
logger.error(f"Cannot perform search - service not initialized for user {self.user_id}") message = f"Cannot perform search - service not initialized for user {self.user_id}"
logger.error(message)
if self.fail_fast:
raise RuntimeError(message)
return [] return []
try: try:
@@ -321,6 +334,8 @@ class TxtaiIntelligenceService:
return results return results
except Exception as e: except Exception as e:
logger.error(f"Search failed for user {self.user_id}: {e}") logger.error(f"Search failed for user {self.user_id}: {e}")
if self.fail_fast:
raise
logger.error(f"Query: '{query}'") logger.error(f"Query: '{query}'")
logger.error(f"Full traceback: {traceback.format_exc()}") logger.error(f"Full traceback: {traceback.format_exc()}")
return [] return []

View File

@@ -82,11 +82,29 @@ from tenacity import (
try: try:
from openai import OpenAI from openai import OpenAI
from openai import NotFoundError
OPENAI_AVAILABLE = True OPENAI_AVAILABLE = True
except ImportError: except ImportError:
OPENAI_AVAILABLE = False OPENAI_AVAILABLE = False
NotFoundError = Exception
logger.warn("OpenAI library not available. Install with: pip install openai") logger.warn("OpenAI library not available. Install with: pip install openai")
HF_FALLBACK_MODELS = [
"openai/gpt-oss-120b:groq",
"moonshotai/Kimi-K2-Instruct-0905:groq",
"meta-llama/Llama-3.1-8B-Instruct:groq",
"mistralai/Mistral-7B-Instruct-v0.3:groq",
]
def _fallback_model_sequence(model: str):
sequence = [model] + HF_FALLBACK_MODELS
seen = set()
for candidate in sequence:
if candidate and candidate not in seen:
seen.add(candidate)
yield candidate
def get_huggingface_api_key() -> str: def get_huggingface_api_key() -> str:
"""Get Hugging Face API key with proper error handling.""" """Get Hugging Face API key with proper error handling."""
api_key = os.getenv('HF_TOKEN') api_key = os.getenv('HF_TOKEN')
@@ -197,14 +215,27 @@ def huggingface_text_response(
import time import time
time.sleep(1) # 1 second delay between API calls time.sleep(1) # 1 second delay between API calls
# Make the API call using Chat Completions response = None
response = client.chat.completions.create( last_error = None
model=model, for candidate_model in _fallback_model_sequence(model):
messages=messages, try:
temperature=temperature, response = client.chat.completions.create(
top_p=top_p, model=candidate_model,
max_tokens=max_tokens messages=messages,
) temperature=temperature,
top_p=top_p,
max_tokens=max_tokens
)
if candidate_model != model:
logger.warning("HF text generation switched to fallback model: %s", candidate_model)
break
except NotFoundError as nf_err:
last_error = nf_err
logger.warning("HF model not found: %s. Trying fallback model.", candidate_model)
continue
if response is None:
raise last_error or Exception("Hugging Face text generation failed: all fallback models failed")
# Extract text from response # Extract text from response
generated_text = response.choices[0].message.content generated_text = response.choices[0].message.content
@@ -338,13 +369,27 @@ def huggingface_structured_json_response(
time.sleep(1) # 1 second delay between API calls time.sleep(1) # 1 second delay between API calls
try: try:
response = client.chat.completions.create( response = None
model=model, last_error = None
messages=messages, for candidate_model in _fallback_model_sequence(model):
temperature=temperature, try:
max_tokens=max_tokens, response = client.chat.completions.create(
response_format={"type": "json_object"} # Try to enforce JSON mode if supported model=candidate_model,
) messages=messages,
temperature=temperature,
max_tokens=max_tokens,
response_format={"type": "json_object"} # Try to enforce JSON mode if supported
)
if candidate_model != model:
logger.warning("HF structured generation switched to fallback model: %s", candidate_model)
break
except NotFoundError as nf_err:
last_error = nf_err
logger.warning("HF structured model not found: %s. Trying fallback model.", candidate_model)
continue
if response is None:
raise last_error or Exception("Hugging Face structured generation failed: all fallback models failed")
response_text = response.choices[0].message.content response_text = response.choices[0].message.content
@@ -379,14 +424,28 @@ def huggingface_structured_json_response(
except Exception as e: except Exception as e:
logger.error(f"❌ Hugging Face API call failed: {e}") logger.error(f"❌ Hugging Face API call failed: {e}")
# If 422 Unprocessable Entity (often due to response_format not supported), retry without it # If 422 Unprocessable Entity (often due to response_format not supported), retry without it
if "422" in str(e) or "not supported" in str(e).lower(): if "422" in str(e) or "not supported" in str(e).lower() or isinstance(e, NotFoundError):
logger.info("Retrying without response_format...") logger.info("Retrying without response_format...")
response = client.chat.completions.create( response = None
model=model, last_error = None
messages=messages, for candidate_model in _fallback_model_sequence(model):
temperature=temperature, try:
max_tokens=max_tokens response = client.chat.completions.create(
) model=candidate_model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
if candidate_model != model:
logger.warning("HF structured no-response_format fallback model: %s", candidate_model)
break
except NotFoundError as nf_err:
last_error = nf_err
logger.warning("HF structured model not found (no response_format path): %s", candidate_model)
continue
if response is None:
raise last_error or e
response_text = response.choices[0].message.content response_text = response.choices[0].message.content
# ... (same parsing logic would apply, simplified here for brevity) # ... (same parsing logic would apply, simplified here for brevity)
try: try:

View File

@@ -6,7 +6,7 @@ migrated from the legacy lib/gpt_providers/text_generation/main_text_generation.
import os import os
import json import json
from typing import Optional, Dict, Any from typing import Optional, Dict, Any, List
from datetime import datetime from datetime import datetime
from loguru import logger from loguru import logger
from fastapi import HTTPException from fastapi import HTTPException
@@ -16,7 +16,13 @@ from .gemini_provider import gemini_text_response, gemini_structured_json_respon
from .huggingface_provider import huggingface_text_response, huggingface_structured_json_response from .huggingface_provider import huggingface_text_response, huggingface_structured_json_response
def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct: Optional[Dict[str, Any]] = None, user_id: str = None) -> str: def llm_text_gen(
prompt: str,
system_prompt: Optional[str] = None,
json_struct: Optional[Dict[str, Any]] = None,
user_id: str = None,
preferred_hf_models: Optional[List[str]] = None,
) -> str:
""" """
Generate text using Language Model (LLM) based on the provided prompt. Generate text using Language Model (LLM) based on the provided prompt.
@@ -54,7 +60,7 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
model = "gemini-2.0-flash-001" model = "gemini-2.0-flash-001"
elif env_provider in ['hf_response_api', 'huggingface', 'hf']: elif env_provider in ['hf_response_api', 'huggingface', 'hf']:
gpt_provider = "huggingface" gpt_provider = "huggingface"
model = "mistralai/Mistral-7B-Instruct-v0.3" model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
# Default blog characteristics # Default blog characteristics
blog_tone = "Professional" blog_tone = "Professional"
@@ -80,7 +86,7 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
model = "gemini-2.0-flash-001" model = "gemini-2.0-flash-001"
elif "huggingface" in available_providers: elif "huggingface" in available_providers:
gpt_provider = "huggingface" gpt_provider = "huggingface"
model = "mistralai/Mistral-7B-Instruct-v0.3" model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
else: else:
logger.error("[llm_text_gen] No API keys found for supported providers.") logger.error("[llm_text_gen] No API keys found for supported providers.")
raise RuntimeError("No LLM API keys configured. Configure GEMINI_API_KEY or HF_TOKEN to enable AI responses.") raise RuntimeError("No LLM API keys configured. Configure GEMINI_API_KEY or HF_TOKEN to enable AI responses.")
@@ -93,9 +99,13 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
model = "gemini-2.0-flash-001" model = "gemini-2.0-flash-001"
elif "huggingface" in available_providers: elif "huggingface" in available_providers:
gpt_provider = "huggingface" gpt_provider = "huggingface"
model = "mistralai/Mistral-7B-Instruct-v0.3" model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
else: else:
raise RuntimeError("No supported providers available.") raise RuntimeError("No supported providers available.")
if gpt_provider == "huggingface" and preferred_hf_models:
model = preferred_hf_models[0]
logger.info(f"[llm_text_gen] Using preferred low-cost HF model: {model}")
logger.debug(f"[llm_text_gen] Using provider: {gpt_provider}, model: {model}") logger.debug(f"[llm_text_gen] Using provider: {gpt_provider}, model: {model}")
@@ -303,7 +313,7 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
elif fallback_provider == "huggingface": elif fallback_provider == "huggingface":
provider_enum = APIProvider.MISTRAL provider_enum = APIProvider.MISTRAL
actual_provider_name = "huggingface" actual_provider_name = "huggingface"
fallback_model = "mistralai/Mistral-7B-Instruct-v0.3" fallback_model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
if fallback_provider == "google": if fallback_provider == "google":
if json_struct: if json_struct:
@@ -330,7 +340,7 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
response_text = huggingface_structured_json_response( response_text = huggingface_structured_json_response(
prompt=prompt, prompt=prompt,
schema=json_struct, schema=json_struct,
model="mistralai/Mistral-7B-Instruct-v0.3", model="mistralai/Mistral-7B-Instruct-v0.3:groq",
temperature=temperature, temperature=temperature,
max_tokens=max_tokens, max_tokens=max_tokens,
system_prompt=system_instructions system_prompt=system_instructions
@@ -338,7 +348,7 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
else: else:
response_text = huggingface_text_response( response_text = huggingface_text_response(
prompt=prompt, prompt=prompt,
model="mistralai/Mistral-7B-Instruct-v0.3", model="mistralai/Mistral-7B-Instruct-v0.3:groq",
temperature=temperature, temperature=temperature,
max_tokens=max_tokens, max_tokens=max_tokens,
top_p=top_p, top_p=top_p,
@@ -394,4 +404,4 @@ def get_api_key(gpt_provider: str) -> Optional[str]:
return api_key_manager.get_api_key(mapped_provider) return api_key_manager.get_api_key(mapped_provider)
except Exception as e: except Exception as e:
logger.error(f"[get_api_key] Error getting API key for {gpt_provider}: {str(e)}") logger.error(f"[get_api_key] Error getting API key for {gpt_provider}: {str(e)}")
return None return None